最后的项目: 构建多线程 web server - 优雅停机与清理

优质
小牛编辑
125浏览
2023-12-01

示例 20-21 中的代码如期通过使用线程池异步的响应请求。这里有一些警告说 workersidthread 字段没有直接被使用,这提醒了我们并没有清理所有的内容。当使用不那么优雅的 ctrl-C 终止主线程时,所有其他线程也会立刻停止,即便它们正处于处理请求的过程中。

现在我们要为 ThreadPool 实现 Drop trait 对线程池中的每一个线程调用 join,这样这些线程将会执行完他们的请求。接着会为 ThreadPool 实现一个告诉线程他们应该停止接收新请求并结束的方式。为了实践这些代码,修改 server 在优雅停机(graceful shutdown)之前只接受两个请求。

现在开始为线程池实现 Drop。当线程池被丢弃时,应该 join 所有线程以确保他们完成其操作。示例 20-23 展示了 Drop 实现的第一次尝试;这些代码还不能够编译:

文件名: src/lib.rs

示例 20-23: 当线程池离开作用域时 join 每个线程

这里首先遍历线程池中的每个 workers。这里使用了 &mut 因为 self 本身是一个可变引用而且也需要能够修改 worker。对于每一个线程,会打印出说明信息表明此特定 worker 正在关闭,接着在 worker 线程上调用 join。如果 join 调用失败,通过 unwrap 使得 panic 并进行不优雅的关闭。

如下是尝试编译代码时得到的错误:

  1. error[E0507]: cannot move out of borrowed content
  2. --> src/lib.rs:65:13
  3. |
  4. 65 | worker.thread.join().unwrap();
  5. | ^^^^^^ cannot move out of borrowed content

这告诉我们并不能调用 join,因为只有每一个 worker 的可变借用,而 join 获取其参数的所有权。为了解决这个问题,需要一个方法将 thread 移动出拥有其所有权的 Worker 实例以便 join 可以消费这个线程。示例 17-15 中我们曾见过这么做的方法:如果 Worker 存放的是 Option<thread::JoinHandle<()>,就可以在 Option 上调用 take 方法将值从 Some 成员中移动出来而对 None 成员不做处理。换句话说,正在运行的 Workerthread 将是 Some 成员值,而当需要清理 worker 时,将 Some 替换为 None,这样 worker 就没有可以运行的线程了。

为此需要更新 Worker 的定义为如下:

文件名: src/lib.rs

  1. # use std::thread;
  2. struct Worker {
  3. id: usize,
  4. thread: Option<thread::JoinHandle<()>>,
  5. }

现在依靠编译器来找出其他需要修改的地方。check 代码会得到两个错误:

  1. error[E0599]: no method named `join` found for type
  2. `std::option::Option<std::thread::JoinHandle<()>>` in the current scope
  3. --> src/lib.rs:65:27
  4. |
  5. 65 | worker.thread.join().unwrap();
  6. | ^^^^
  7. error[E0308]: mismatched types
  8. --> src/lib.rs:89:13
  9. |
  10. 89 | thread,
  11. | ^^^^^^
  12. | |
  13. | expected enum `std::option::Option`, found struct
  14. `std::thread::JoinHandle`
  15. | help: try using a variant of the expected type: `Some(thread)`
  16. |
  17. = note: expected type `std::option::Option<std::thread::JoinHandle<()>>`
  18. found type `std::thread::JoinHandle<_>`

让我们修复第二个错误,它指向 Worker::new 结尾的代码;当新建 Worker 时需要将 thread 值封装进 Some。做出如下改变以修复问题:

文件名: src/lib.rs

文件名: src/lib.rs

  1. impl Drop for ThreadPool {
  2. fn drop(&mut self) {
  3. for worker in &mut self.workers {
  4. println!("Shutting down worker {}", worker.id);
  5. if let Some(thread) = worker.thread.take() {
  6. thread.join().unwrap();
  7. }
  8. }
  9. }
  10. }

如第十七章我们见过的,Option 上的 take 方法会取出 Some 而留下 None。使用 if let 解构 Some 并得到线程,接着在线程上调用 join。如果 worker 的线程已然是 None,就知道此时这个 worker 已经清理了其线程所以无需做任何操作。

向线程发送信号使其停止接收任务

有了这些修改,代码就能编译且没有任何警告。不过也有坏消息,这些代码还不能以我们期望的方式运行。问题的关键在于 Worker 中分配的线程所运行的闭包中的逻辑:调用 并不会关闭线程,因为他们一直 loop 来寻找任务。如果采用这个实现来尝试丢弃 ThreadPool ,则主线程会永远阻塞在等待第一个线程结束上。

为了修复这个问题,修改线程既监听是否有 Job 运行也要监听一个应该停止监听并退出无限循环的信号。所以通道将发送这个枚举的两个成员之一而不是 Job 实例:

文件名: src/lib.rs

  1. # struct Job;
  2. enum Message {
  3. NewJob(Job),
  4. Terminate,
  5. }

Message 枚举要么是存放了线程需要运行的 JobNewJob 成员,要么是会导致线程退出循环并终止的 Terminate 成员。

同时需要修改通道来使用 Message 类型值而不是 Job,如示例 20-24 所示:

文件名: src/lib.rs

  1. pub struct ThreadPool {
  2. workers: Vec<Worker>,
  3. sender: mpsc::Sender<Message>,
  4. }
  5. // --snip--
  6. impl ThreadPool {
  7. // --snip--
  8. pub fn execute<F>(&self, f: F)
  9. where
  10. F: FnOnce() + Send + 'static
  11. let job = Box::new(f);
  12. self.sender.send(Message::NewJob(job)).unwrap();
  13. }
  14. }
  15. // --snip--
  16. impl Worker {
  17. fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) ->
  18. Worker {
  19. let thread = thread::spawn(move ||{
  20. loop {
  21. let message = receiver.lock().unwrap().recv().unwrap();
  22. match message {
  23. Message::NewJob(job) => {
  24. println!("Worker {} got a job; executing.", id);
  25. job.call_box();
  26. },
  27. Message::Terminate => {
  28. println!("Worker {} was told to terminate.", id);
  29. break;
  30. },
  31. }
  32. }
  33. });
  34. Worker {
  35. id,
  36. thread: Some(thread),
  37. }
  38. }
  39. }

示例 20-24: 收发 Message 值并在 Worker 收到 Message::Terminate 时退出循环

为了适用 Message 枚举需要将两个地方的 Job 修改为 MessageThreadPool 的定义和 Worker::new 的签名。ThreadPoolexecute 方法需要发送封装进 Message::NewJob 成员的任务。然后,在 Worker::new 中当从通道接收 Message 时,当获取到 NewJob成员会处理任务而收到 Terminate 成员则会退出循环。

通过这些修改,代码再次能够编译并继续按照期望的行为运行。不过还是会得到一个警告,因为并没有创建任何 Terminate 成员的消息。如示例 20-25 所示修改 Drop 实现来修复此问题:

文件名: src/lib.rs

示例 20-25:在对每个 worker 线程调用 join 之前向 worker 发送 Message::Terminate

为了更好的理解为什么需要两个分开的循环,想象一下只有两个 worker 的场景。如果在一个单独的循环中遍历每个 worker,在第一次迭代中向通道发出终止消息并对第一个 worker 线程调用 join。我们会一直等待第一个 worker 结束,不过它永远也不会结束因为第二个线程接收了终止消息。死锁!

为了避免此情况,首先在一个循环中向通道发出所有的 Terminate 消息,接着在另一个循环中 join 所有的线程。每个 worker 一旦收到终止消息即会停止从通道接收消息,意味着可以确保如果发送同 worker 数相同的终止消息,在 join 之前每个线程都会收到一个终止消息。

为了实践这些代码,如示例 20-26 所示修改 main 在优雅停机 server 之前只接受两个请求:

文件名: src/bin/main.rs

  1. fn main() {
  2. let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
  3. for stream in listener.incoming().take(2) {
  4. let stream = stream.unwrap();
  5. pool.execute(|| {
  6. handle_connection(stream);
  7. });
  8. }
  9. println!("Shutting down.");
  10. }

示例 20-26: 在处理两个请求之后通过退出循环来停止 server

你不会希望真实世界的 web server 只处理两次请求就停机了,这只是为了展示优雅停机和清理处于正常工作状态。

take 方法定义于 Iterator trait,这里限制循环最多头 2 次。ThreadPool 会在 main 的结尾离开作用域,而且还会看到 drop 实现的运行。

使用 cargo run 启动 server,并发起三个请求。第三个请求应该会失败,而终端的输出应该看起来像这样:

  1. $ cargo run
  2. Compiling hello v0.1.0 (file:///projects/hello)
  3. Finished dev [unoptimized + debuginfo] target(s) in 1.0 secs
  4. Running `target/debug/hello`
  5. Worker 0 got a job; executing.
  6. Worker 3 got a job; executing.
  7. Shutting down.
  8. Sending terminate message to all workers.
  9. Shutting down all workers.
  10. Shutting down worker 0
  11. Worker 2 was told to terminate.
  12. Worker 0 was told to terminate.
  13. Worker 3 was told to terminate.
  14. Shutting down worker 1
  15. Shutting down worker 2
  16. Shutting down worker 3

可能会出现不同顺序的 worker 和信息输出。可以从信息中看到服务是如何运行的: worker 0 和 worker 3 获取了头两个请求,接着在第三个请求时,我们停止接收连接。当 ThreadPoolmain 的结尾离开作用域时,其 Drop 实现开始工作,线程池通知所有线程终止。每个 worker 在收到终止消息时会打印出一个信息,接着线程池调用 join 来终止每一个 worker 线程。

这个特定的运行过程中一个有趣的地方在于:注意我们向通道中发出终止消息,而在任何线程收到消息之前,就尝试 join worker 0 了。worker 0 还没有收到终止消息,所以主线程阻塞直到 worker 0 结束。与此同时,每一个线程都收到了终止消息。一旦 worker 0 结束,主线程就等待其他 worker 结束,此时他们都已经收到终止消息并能够停止了。

恭喜!现在我们完成了这个项目,也有了一个使用线程池异步响应请求的基础 web server。我们能对 server 执行优雅停机,它会清理线程池中的所有线程。如下是完整的代码参考:

文件名: src/bin/main.rs

  1. extern crate hello;
  2. use hello::ThreadPool;
  3. use std::io::prelude::*;
  4. use std::net::TcpListener;
  5. use std::net::TcpStream;
  6. use std::fs::File;
  7. use std::thread;
  8. use std::time::Duration;
  9. fn main() {
  10. let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
  11. let pool = ThreadPool::new(4);
  12. for stream in listener.incoming().take(2) {
  13. let stream = stream.unwrap();
  14. pool.execute(|| {
  15. handle_connection(stream);
  16. });
  17. }
  18. println!("Shutting down.");
  19. }
  20. fn handle_connection(mut stream: TcpStream) {
  21. let mut buffer = [0; 512];
  22. stream.read(&mut buffer).unwrap();
  23. let get = b"GET / HTTP/1.1\r\n";
  24. let sleep = b"GET /sleep HTTP/1.1\r\n";
  25. let (status_line, filename) = if buffer.starts_with(get) {
  26. ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
  27. } else if buffer.starts_with(sleep) {
  28. thread::sleep(Duration::from_secs(5));
  29. ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
  30. } else {
  31. ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
  32. };
  33. let mut file = File::open(filename).unwrap();
  34. let mut contents = String::new();
  35. file.read_to_string(&mut contents).unwrap();
  36. let response = format!("{}{}", status_line, contents);
  37. stream.write(response.as_bytes()).unwrap();
  38. stream.flush().unwrap();
  39. }

文件名: src/lib.rs

  • ThreadPool 和其公有方法增加更多文档
  • 为库的功能增加测试
  • 将 调用改为更健壮的错误处理
  • 使用 ThreadPool 进行其他不同于处理网络请求的任务

总结

好极了!你结束了本书的学习!由衷感谢你与我们一道加入这次 Rust 之旅。现在你已经准备好出发并实现自己的 Rust 项目并帮助他人了。请不要忘记我们的社区,这里有其他 Rustaceans 正乐于帮助你迎接 Rust 之路上的任何挑战。