当前位置: 首页 > 面试题库 >

PubSub如何在BookSleeve / Redis中工作?

马德厚
2023-03-14
问题内容

我不知道最好的方法是使用BookSleeve发布和订阅频道。目前,我实现了几种静态方法(请参见下文),这些方法可以将内容发布到特定的频道,而新创建的频道则存储在中private static Dictionary<string, RedisSubscriberConnection> subscribedChannels;

考虑到我想发布到通道并订阅同一应用程序中的通道,这是正确的方法吗(注意:我的包装器是一个静态类)。即使我想发布和订阅,创建一个频道就足够了吗?显然,我不会在同一应用程序中发布相同的频道。但是我对其进行了测试,并且效果很好:

 RedisClient.SubscribeToChannel("Test").Wait();
 RedisClient.Publish("Test", "Test Message");

而且有效。

这是我的问题:

1)设置专用的发布频道和专用的订阅频道,而不是同时使用两者,会更有效吗?

2)从语义上来说,“ channel”和“
PatternSubscription”之间有什么区别?我的理解是,我可以通过PatternSubscription()同一频道订阅多个“主题”,对吗?但是,如果我想为每个“主题”调用不同的回调,则必须为每个正确的主题设置一个通道吗?这样有效吗?还是您会建议这样做?

这里的代码片段。

谢谢!!!

    public static Task<long> Publish(string channel, byte[] message)
    {
        return connection.Publish(channel, message);
    }

    public static Task SubscribeToChannel(string channelName)
    {
        string subscriptionString = ChannelSubscriptionString(channelName);

        RedisSubscriberConnection channel = connection.GetOpenSubscriberChannel();

        subscribedChannels[subscriptionString] = channel;

        return channel.PatternSubscribe(subscriptionString, OnSubscribedChannelMessage);
    }

    public static Task UnsubscribeFromChannel(string channelName)
    {
        string subscriptionString = ChannelSubscriptionString(channelName);

        if (subscribedChannels.Keys.Contains(subscriptionString))
        {
            RedisSubscriberConnection channel = subscribedChannels[subscriptionString];

            Task  task = channel.PatternUnsubscribe(subscriptionString);

            //remove channel subscription
            channel.Close(true);
            subscribedChannels.Remove(subscriptionString);

            return task;
        }
        else
        {
            return null;
        }
    }

    private static string ChannelSubscriptionString(string channelName)
    {
        return channelName + "*";
    }

问题答案:

1:您的示例(Test)中只有一个频道;通道只是用于特定发布/订阅交换的名称。但是,由于redis API的工作原理的特殊性,有必要使用2个 连接
。具有 任何 订阅的连接不能执行任何其他操作,除了:

  • 听消息
  • 管理自己的订阅(subscribepsubscribeunsubscribepunsubscribe

但是,我不明白这一点:

private static Dictionary<string, RedisSubscriberConnection>

除非您要满足自己的特定需求,否则您不需要多个订阅者连接。单个订阅者连接可以处理任意数量的订阅。快速检查一下client list我的一台服务器,并且与(在撰写本文时)23,002个订阅建立了一个连接。可能可以减少,但是:它起作用。

2:模式订阅支持通配符;因此/topic/1/topic/2/您可以订阅而不是订阅等等/topic/*。所使用的 实际
通道的名称publish作为回调签名的一部分提供给接收者。

都可以。应该注意的是,的性能publish受唯一订阅总数的影响-
但坦率地说,即使您有成千上万个使用subscribe而不是订阅的频道,它的速度仍然非常快(如:0ms)psubscribe

但是从 publish

时间复杂度:O(N + M),其中N是订阅接收通道的客户端数,M是订阅模式(任何客户端)的总数。

我建议阅读pub / sub的redis文档。

编辑以下问题:

a)如果我想保证在接收项目时保留从同一发布者发送项目的顺序,我必须同步(使用Result或Wait())进行发布,对吗?

根本没有任何区别;因为您提到了Result/ Wait(),所以我假设您正在谈论BookSleeve-
在这种情况下,多路复用器已经保留了命令顺序。Redis本身是单线程的,并且将始终按顺序在单个连接上处理命令。但是:订阅服务器上的回调可以异步执行,并且可以(单独)传递给工作线程。我目前正在调查是否可以强制从强制进行此操作RedisSubscriberConnection

更新:从1.3.22开始,您可以将设置CompletionModePreserveOrder-,那么所有回调将依次而不是同时完成。

b)根据您的建议进行调整后,无论有效载荷的大小如何,发布少量项目时我的表现都很好。但是,当同一发布者发送100,000个或更多项目时,性能会迅速下降(仅从我的机器发送时会下降到7-8秒)。

首先,这段时间听起来很高-
在本地测试我得到的(对于100,000个出版物,包括等待所有出版物的响应)是1766ms(本地)或1219ms(远程)(听起来可能违反直觉,但是我的“本地”是)
t运行相同版本的redis;在Centos上,我的“远程”是2.6.12;在Windows上,我的“本地”是2.6.8-pre2。

我不能让实际的服务器建立网络更快或速度,而是:如果这是数据包碎片,我已经添加(只为你)一SuspendFlush()/
ResumeFlush()对。这将禁用快速刷新(即,当发送队列为空时;仍会发生其他类型的刷新);您可能会发现这有帮助:

conn.SuspendFlush();
try {
    // start lots of operations...
} finally {
    conn.ResumeFlush();
}

请注意,Wait直到恢复后才应该这样做,因为在调用之前ResumeFlush(),可能还有一些操作仍在发送缓冲区中。有了这些,我得到了(100,000次操作):

local: 1766ms (eager-flush) vs 1554ms (suspend-flush)
remote: 1219ms (eager-flush) vs 796ms (suspend-flush)

如您所见,它可以为远程服务器提供更多帮助,因为它将通过网络放置更少的数据包。

我无法使用事务,因为稍后要发布的项目无法一次全部使用。有没有一种方法可以在记住这些知识的情况下进行优化?

认为 以上已解决了这一问题-但请注意,最近CreateBatch也添加了它。批处理的操作很像交易-
只是:没有交易。同样,这是减少数据包碎片的另一种机制。在您的特定情况下,我怀疑暂停/恢复(同花)是您最好的选择。

您是否建议使用一个常规的RedisConnection和一个RedisSubscriberConnection或任何其他配置来使此类包装器执行所需的功能?

只要你不执行阻塞操作(blpopbrpopbrpoplpush等),或将过大的BLOB沿着电线(潜在延迟等操作,同时它清除),那么每个类型的单个连接通常工作得很好。但是YMMV取决于您的确切使用要求。



 类似资料:
  • 问题内容: 我想创建一个发布-订阅基础结构,其中每个订阅者都将收听多个(例如100k)频道。 我认为可以将Redis PubSub用于此目的,但是我不确定在这里订阅数千个频道是否是最佳实践。为了回答这个问题,我想知道Redis中的订阅机制在后台如何工作。 另一种选择是为每个订户创建一个频道,并在两者之间放置一些组件,该组件将获取所有消息并将其发布到相关的频道。 还有其他想法吗? 问题答案: Sal

  • 问题内容: 我想在内容中使用redis搜索,这样可能吗? 例如 : 我使用c#和BookSleeve,如果帖子的“已删除”属性为false,我想在帖子的“内容”属性中搜索一个单词,我可以使用HSET并序列化此对象以存储在redis或SADD或…中。 1-存储该对象的最佳方法是什么?2-搜索和筛选对象以退货的最佳方法是什么? [更新] 我可以使用SISMEMBER在redis中搜索确切的单词或短语,

  • 问题内容: 有人问我,PubSub是什么,以及如何创建频道,我指出他是关于redis.io => http://redis.io/topics/pubsub的文章。我认为这很清楚,但是我想知道是否有人有更好的解释。理想情况下,请使用清楚地描述它。 问题答案: 发布/订阅是一个非常简单的范例。就像在广播电台上进行脱口秀一样。那是出版。您希望至少有一个或多个人会选择您的频道来收听广播节目(SUBSCR

  • 问题内容: 我以为我了解这项技术,但也许我不了解。两者有什么区别?为什么要选择一个? 用例:〜实时更新。 问题答案: 我是Faye的作者。从概念上讲,Faye和Redis pub / sub的功能非常相似,实际上,最新版本的Faye可以将Redis用作后端。正如Tom所说,Redis适用于服务器群集内的进程间消息传递,因为Redis客户端将可以访问整个Redis数据库。 如果您想通过网络提供可公开

  • 问题内容: 我使用RedisConnection Set方法设置字节数组,但是如何获取数据?get返回包装的字节数组吗? 链接: http://code.google.com/p/booksleeve/ http://code.google.com/p/protobuf-net/ 这可行,但感觉不对: 更多信息: 问题答案: 那是完全正确的。“ Get”(BookSleeve)返回一个deferr

  • 问题内容: 我最近开始在新的Azure VM上托管我的一个副项目。该应用程序将Redis用作内存缓存。在本地环境中一切正常,但是现在我已将代码移至Azure,我发现Bookleeve中出现了一些奇怪的异常。 当应用首次启动时,一切正常。但是,在闲置约5-10分钟后,对应用程序的下一个请求遇到了网络异常(我现在正在工作,并且没有确切的错误消息,因此我回到家时会张贴这些消息,如果人们认为他们与讨论密切