当前位置: 首页 > 工具软件 > Tokio > 使用案例 >

Tokio教程之通道

陈功
2023-12-01

通道

Tokio教程之通道

https://tokio.rs/tokio/tutorial/channels

假设我们想运行两个并发的Redis命令。我们可以为每个命令生成一个任务。那么这两条命令就会同时发生。

起初,我们可能会尝试类似的做法。

use mini_redis::client;

#[tokio::main]
async fn main() {
    // Establish a connection to the server
    let mut client = client::connect("127.0.0.1:6379").await.unwrap();

    // Spawn two tasks, one gets a key, the other sets a key
    let t1 = tokio::spawn(async {
        let res = client.get("hello").await;
    });

    let t2 = tokio::spawn(async {
        client.set("foo", "bar".into()).await;
    });

    t1.await.unwrap();
    t2.await.unwrap();
}

Copy

这不会被编译,因为两个任务都需要以某种方式访问 client。由于Client没有实现Copy,如果没有一些代码来促进这种共享,它将不会被编译。此外,Client::set 需要 &mut self,这意味着调用它需要独占访问。我们可以为每个任务打开一个连接,但这并不理想。我们不能使用 std::sync::Mutex,因为 .await 需要在持有锁的情况下被调用。我们可以使用 tokio::sync::Mutex,但这只允许一个飞行中的请求。如果客户端实现了管道化,那么异步Mutex会导致连接的利用率不足。

消息传递

答案是使用消息传递。这种模式包括产生一个专门的任务来管理 client 资源。任何希望发出请求的任务都会向 client 任务发送一个消息。client 任务代表发送者发出请求,并将响应发回给发送者。

使用这种策略,就可以建立一个单一的连接。管理 client 的任务能够获得排他性的访问,以便调用get和set。此外,通道作为一个缓冲区工作。在 client 任务忙碌的时候,操作可以被发送到 client 任务。一旦 client 任务可以处理新的请求,它就从通道中拉出下一个请求。这可以带来更好的吞吐量,并且可以扩展到支持连接池。

Tokio的通道原语

Tokio提供一些通道(channel),每个通道都有不同的用途:

  • mpsc:多生产者,单消费者通道。可以发送许多数值。
  • oneshot:单生产者,单消费者通道。可以发送一个单一的值。
  • broadcast: 多生产者,多消费者。可以发送许多值。每个接收者看到每个值。
  • watch:单生产者,多消费者。可以发送许多值,但不保留历史。接收者只看到最近的值。

如果你需要一个多生产者多消费者的通道,只有一个消费者看到每个消息,你可以使用 async-channel crate。也有一些通道用于异步Rust之外,比如 std::sync::mpsc 和crossbeam::channel。这些通道通过阻塞线程来等待消息,这在异步代码中是不允许的。

在本节中,我们将使用 mpsc 和 oneshot。其他类型的消息传递通道将在后面的章节中进行探讨。本节的完整代码可在此找到。

定义消息类型

在大多数情况下,当使用消息传递时,接收消息的任务会对一个以上的命令做出响应。在我们的例子中,该任务将对GET和SET命令做出响应。为了对此进行建模,我们首先定义一个Command枚举,并为每个命令类型包含一个变量。

use bytes::Bytes;

#[derive(Debug)]
enum Command {
    Get {
        key: String,
    },
    Set {
        key: String,
        val: Bytes,
    }
}	

Copy

创建通道

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    // Create a new channel with a capacity of at most 32.
    let (tx, mut rx) = mpsc::channel(32);

    // ... Rest comes here
}

Copy

mpsc 通道被用来向管理redis连接的任务发送命令。多生产者的能力允许从许多任务发送消息。创建通道会返回两个值,一个发送者和一个接收者。这两个句柄是单独使用的。它们可以被转移到不同的任务。

通道的创建容量为32。如果消息的发送速度比接收速度快,通道将储存这些消息。一旦通道中存储了32条消息,调用 send(...).await 将进入睡眠状态,直到有消息被接收者删除。

从多个任务中发送是通过 cloning sender 来完成的。比如说。

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(32);
    let tx2 = tx.clone();

    tokio::spawn(async move {
        tx.send("sending from first handle").await;
    });

    tokio::spawn(async move {
        tx2.send("sending from second handle").await;
    });

    while let Some(message) = rx.recv().await {
        println!("GOT = {}", message);
    }
}

Copy

两条信息都被发送到单一的 Receiver 句柄。不可能克隆一个 mpsc 通道的Receiver。

当每个 sender 都超出了范围或被放弃时,就不再可能向通道发送更多的消息了。在这一点上,接收器上的 recv 调用将返回None,这意味着所有的发送者都消失了,通道被关闭。

在我们管理 Redis 连接的任务中,它知道一旦通道关闭,它就可以关闭 Redis 连接,因为该连接将不会再被使用。

生成管理器任务

接下来,生成一个任务,处理来自通道的消息。首先,一个客户端连接被建立到Redis。然后,通过Redis连接发出收到的命令。

use mini_redis::client;
// The `move` keyword is used to **move** ownership of `rx` into the task.
let manager = tokio::spawn(async move {
    // Establish a connection to the server
    let mut client = client::connect("127.0.0.1:6379").await.unwrap();

    // Start receiving messages
    while let Some(cmd) = rx.recv().await {
        use Command::*;

        match cmd {
            Get { key } => {
                client.get(&key).await;
            }
            Set { key, val } => {
                client.set(&key, val).await;
            }
        }
    }
});

Copy

现在,更新这两个任务,通过通道发送命令,而不是直接在Redis连接上发布。

// The `Sender` handles are moved into the tasks. As there are two
// tasks, we need a second `Sender`.
let tx2 = tx.clone();

// Spawn two tasks, one gets a key, the other sets a key
let t1 = tokio::spawn(async move {
    let cmd = Command::Get {
        key: "hello".to_string(),
    };

    tx.send(cmd).await.unwrap();
});

let t2 = tokio::spawn(async move {
    let cmd = Command::Set {
        key: "foo".to_string(),
        val: "bar".into(),
    };

    tx2.send(cmd).await.unwrap();
});

Copy

在 main 函数的底部,我们 .await join 句柄,以确保在进程退出前完全完成命令。

t1.await.unwrap();
t2.await.unwrap();
manager.await.unwrap();

Copy

接收响应

最后一步是接收来自管理器任务的响应。GET命令需要获得数值,SET命令需要知道操作是否成功完成。

为了传递响应,使用了一个 oneshot 通道。oneshot 通道是一个单一生产者、单一消费者的通道,为发送单一数值而优化。在我们的例子中,这个单一的值就是响应。

与mpsc类似,oneshot::channel() 返回一个 sender 和 receiver 句柄。

use tokio::sync::oneshot;

let (tx, rx) = oneshot::channel();

Copy

与mpsc不同,没有指定容量,因为容量总是1。此外,两个句柄都不能被克隆。

为了接收来自管理任务的响应,在发送命令之前,创建了一个 oneshot 通道。该通道的 Sender 部分被包含在给管理任务的命令中。receive 部分用来接收响应。

首先,更新 Command 以包括 Sender。为了方便起见,使用一个类型别名来引用 Sender。

use tokio::sync::oneshot;
use bytes::Bytes;

/// Multiple different commands are multiplexed over a single channel.
#[derive(Debug)]
enum Command {
    Get {
        key: String,
        resp: Responder<Option<Bytes>>,
    },
    Set {
        key: String,
        val: Vec<u8>,
        resp: Responder<()>,
    },
}

/// Provided by the requester and used by the manager task to send
/// the command response back to the requester.
type Responder<T> = oneshot::Sender<mini_redis::Result<T>>;

Copy

现在,更新发布命令的任务,包括 oneshot::Sender

let t1 = tokio::spawn(async move {
    let (resp_tx, resp_rx) = oneshot::channel();
    let cmd = Command::Get {
        key: "hello".to_string(),
        resp: resp_tx,
    };

    // Send the GET request
    tx.send(cmd).await.unwrap();

    // Await the response
    let res = resp_rx.await;
    println!("GOT = {:?}", res);
});

let t2 = tokio::spawn(async move {
    let (resp_tx, resp_rx) = oneshot::channel();
    let cmd = Command::Set {
        key: "foo".to_string(),
        val: b"bar".to_vec(),
        resp: resp_tx,
    };

    // Send the SET request
    tx2.send(cmd).await.unwrap();

    // Await the response
    let res = resp_rx.await;
    println!("GOT = {:?}", res);
});

Copy

最后,更新管理任务,通过 oneshot 的通道发送响应。

while let Some(cmd) = rx.recv().await {
    match cmd {
        Command::Get { key, resp } => {
            let res = client.get(&key).await;
            // Ignore errors
            let _ = resp.send(res);
        }
        Command::Set { key, val, resp } => {
            let res = client.set(&key, val.into()).await;
            // Ignore errors
            let _ = resp.send(res);
        }
    }
}

Copy

在 oneshot::Sender 上调用 send 会立即完成,不需要 .await。这是因为在 oneshot 通道上的发送将总是立即失败或成功,而不需要任何形式的等待。

在 oneshot 通道上发送一个值,当接收方的一半已经放弃时,返回 Err。这表明接收方对响应不再感兴趣了。在我们的方案中,接收方取消兴趣是一个可接受的事件。resp.send(...)返回的Err不需要被处理。

背压和有界通道

无论何时引入并发或队列,都必须确保队列是有界的,系统将优雅地处理负载。无界的队列最终会占用所有可用的内存,导致系统以不可预测的方式失败。

Tokio 小心避免隐性队列。这其中很大一部分是由于异步操作是 lazy 的。请考虑以下情况。

loop {
    async_op();
}

Copy

如果异步操作急切地运行,循环将重复排队运行一个新的 async_op,而不确保之前的操作完成。这就导致了隐性的无边界队列。基于回调的系统和基于急切的 future 的系统特别容易受此影响。

然而,在Tokio和异步Rust中,上述片段根本不会导致 async_op 的运行。这是因为 .await 从未被调用。如果该片段被更新为使用 .await,那么循环会等待操作完成后再重新开始。

loop {
    // Will not repeat until `async_op` completes
    async_op().await;
}

Copy

必须明确地引入并发和队列。做到这一点的方法包括:

  • tokio::spawn
  • select!
  • join!
  • mpsc::channel

在这样做的时候,要注意确保并发的总量是有界限的。例如,当编写一个TCP接受循环时,要确保打开的套接字的总数是有限制的。当使用 mpsc::channel 时,选择一个可管理的通道容量。具体的约束值将是特定于应用的。

注意和挑选好的界限是编写可靠的 tokio 应用程序的一个重要部分。

 类似资料: