1.8 管道和多路复用器

优质
小牛编辑
141浏览
2023-12-01

管道和多路复用器

延迟情况是难以忍受的。现代计算机能以惊人的速度生成数据,并且高速互联网(经常是在重要的服务器之间有多个并行连接)提供了极大的带宽,但是这可恶的延迟意味着电脑花了大量时间等待数据。基于延续的编程变得越来越流行的几个原因之一。让我们考虑一些规则的程序代码:

string a = db.StringGet("a");
string b = db.StringGet("b");

按照这些关联的步骤,这看起来像:

    [req1]                         # 客户端 : 客户端库构造出一个请求1
         [c=>s]                    # 网络   : 请求1被发送到服务器
              [server]             # 服务器 : 服务器处理请求1
                     [s=>c]        # 网络   : 响应1被发送回客户端
                          [resp1]  # 客户端 : 客户端库解析响应数据1
                                [req2]
                                     [c=>s]
                                          [server]
                                                 [s=>c]
                                                      [resp2]

现在让我们突出客户端处理的部分:

[req1]
     [====waiting=====]
                      [resp1]
                            [req2]
                                 [====waiting=====]
                                                  [resp2]

记住这是不可测量的,如果是用时间来衡量,那么会一直等待下去(一直耗费时间在等待处理)。

管道

由于这个原因,很多Redis客户端允许你利用管道,处理发送多个消息而无需等待每一个的回复,并且当消息进来的时候,回复的处理将会延后。在.NET中,一个操作可以被初始化且尚未完成;在完成后或者发生错误后由TPL.aspx)通过 Task.aspx)/Task\.aspx) 的API封装。本质上,Task\ 表示的是一个"将来可能的T类型的值"(非泛型 Task 实际上是 Task\)。你可以二选其一:

  • .Wait() (阻塞执行,直到任务完成)
  • .ContinueWith(...)或者await (创建一个在目标任务完成时异步执行的延续任务)

例如:下面是Redis客户端利用管道的示例代码:

var aPending = db.StringGetAsync("a");
var bPending = db.StringGetAsync("b");
var a = db.Wait(aPending);
var b = db.Wait(bPending);

注意:在这里我使用了 db.Wait 因为他会自动的应用同步超时配置,如你你喜欢的话,你也可以使用 aPending.Wait() 或者 Task.WaitAll(aPending, bPending);使用管道允许我们在网络中立即得到两个请求,从而消除大部分的延时。此外,它也可以帮助我们减少包碎片:20个请求单独的发送(等待每个响应)至少需要20个包,但是在管道中发送20个请求只需要少数几个包(甚至只需要一个包)。

即发即弃

一个特别的管道案例是当我们不关心操作的响应,允许代码继续执行且排队操作是在后台处理的时候。这通常意味着我们能把并发工作放在来自一个单独调用的连接中。我们可以使用 flags 参数来实现:

// 可调期限
db.KeyExpire(key, TimeSpan.FromMinutes(5), flags: CommandFlags.FireAndForget);
var value = (string)db.StringGet(key);

FireAndForget 标记会使客户端库去正常的排队工作,但是会立即返回一个默认值(KeyExpire 会返回一个 bool 类型,这将返回 false,因为默认值是 false - 然而返回的是毫无意义的值,我们应该忽略)。*Async 方法也会返回一个已完成的 Task\ 作为默认值(或者一个已完成的 Task 作为 void 返回 )。

多路复用(Multiplexing)

使用管道处理技术是非常好的,但是我们经常单独使用阻塞代码仅去取一个单独的值(或者可能只执行一些操作,这取决于各自的需要)。这意味着我们仍然有这样一个问题:我们花费大量的时间去等待数据从客户端传输到服务器端。现在我们考虑一个繁忙的应用,这可能是一个web服务。这类应用通常都具有高并发性,当你有20个并行应用请求所有需要的数据,你可能想旋转(spinning up)这20个连接,或者你可以同步访问一个单独连接(这意味着最后的调用者需要等待前面19个全部执行完成才开始)。或者作为一个妥协方式,也许是个出租5个连接的连接池--不管你怎么做,都会有大量的等待操作。StackExchange.Redis 不需要那样做;反而,它为你做了大量的工作,通过多路复用单个连接,使你可以有效的利用空余的时间。当不同的调用方同时访问时,它会自动使用管道分离访问请求,所以无论使用阻塞方式或者异步方式访问,这些工作都是被管道处理的。因此我们可以有10或者20个先前的"get a and b"的场景(来自不同应用的请求),并且它们会尽快的取得连接。从本质上讲,它填补了 waiting 时间与其他调用方的工作。

因此,StackExchange.Redis不会提供(并将永远不会提供)"阻塞弹出(blocking pops)"(BLPOP, BRPOP 以及 BRPOPLPUSH) - 因为这将允许一个单独的调用方拖延整个多路复用器,进而阻塞所有的调用方。 StackExchange.Redis 需要保持的工作是为了验证某个事务的前提条件,这就是为什么StackExchange.Redis封装了这样的条件在内部管理 Condition 实例。更多事务信息。如果你想要"阻塞弹出(blocking pops)",那么我强烈建议你考虑使用发布/订阅功功能:

sub.Subscribe(channel, delegate {
    string work = db.ListRightPop(key);
    if (work != null) Process(work);
});
//...
db.ListLeftPush(key, newWork, flags: CommandFlags.FireAndForget);
sub.Publish(channel, "");

注意:无需阻塞操作即可达到相同的目的有:

  • 数据不是通过发布/订阅发送的;发布/订阅API仅被用来通知工人来检查更多的工作

  • 如果没有工人,那么新项仍然在缓冲列表中,工作不会执行

  • 仅有一个工人能弹出一个值;当消费者多于生产者,一些消费者将得到通知然后发现没有什么可做的

  • 当你重新启动工人,你应该假设有积压工作可以处理

  • 除此之外,对于阻塞弹出的语义是相同的

并发

应该注意的是管道/多路复用器/future-value 等方式与基于 延续的异步代码也是做得非常好的;例如:

string value = await db.StringGet(key);
if (value == null) {
    value = await ComputeValueFromDatabase(...);
    db.StringSet(key, value, flags: CommandFlags.FireAndForget);
}
return value;