当前位置: 首页 > 知识库问答 >
问题:

节点Redis XREAD阻止订阅

史淳
2023-03-14

我正在将redis发布/子系统转换为redis streams,这样我就可以为服务器发送的事件添加一些容错功能。

用传统的方式订阅很简单:

import { createClient } from 'redis';

const redisOptions = {
  url: `${process.env.REDIS_URL}/0`
}
const redis = createClient(redisOptions);
redis.setMaxListeners(100000);

redis.on("message", (channel, message) => {
  console.log(channel);
  console.log(message);
});

redis.subscribe('foo');

这会永久阻塞,并保持连接打开。在这种情况下,发布到redis将添加到您的日志中。

const json = { a: 1, b: 2 };

redis.publish('foo', JSON.stringify(json));

切换到流时,您使用XREAD而不是订阅,使用XADD而不是发布,数据会有很大的不同。我正在努力解决的部分是阻塞。

redis.xread('BLOCK', 0, 'STREAMS', 'foo', '$', (err, str) => {
  if (err) return console.error('Error reading from stream:', err);

  str.forEach(message => {
    console.log(message);
  });
}

发送消息时,我的“订阅”会接收第一条消息,但不会记录更多消息。

共有1个答案

年光明
2023-03-14

说实话,我问这个问题只是因为谷歌对我没有好处,而且我找不到其他人发布关于这个问题的帖子。希望这有帮助!

所以,XREAD只是在最初的调用中阻塞。它将在设定的时间段内(如果将时间设置为0,则无限期地)静坐等待,但一旦接收到数据,它的职责就被视为已完成,并且它将解锁。要使“订阅”保持活动状态,需要再次调用XREAD,并使用流中的最新id。这将替换我们传递给它的初始值$

递归似乎是一个完美的解决方案:

const xread = ({ stream, id }) => {
  redis.xread('BLOCK', 0, 'STREAMS', stream, id, (err, str) => {
    if (err) return console.error('Error reading from stream:', err);

    str[0][1].forEach(message => {
      id = message[0];
      console.log(id);
      console.log(message[1]);
    });

    setTimeout(() => xread({ stream, id }), 0)
  });
}

xread({ stream: 'asdf', id: '$' })

 类似资料:
  • 我有一种情况,我在JavaFX中有一个容器节点(HBox)和两个子节点。当我从左边的子节点拖到右边时,我会得到很多拖到左边节点的事件,最后当我在右边节点上释放鼠标时,我会在父节点中得到一个单击事件。下面是一些复制这种情况的代码。 我想知道的是:如何阻止家长接收此点击事件?我在使用事件的左、右节点上尝试了各种事件过滤器和事件处理程序,但似乎找不到合适的过滤器和事件处理器来防止单击事件发送到父节点。有

  • 我使用更改昵称做了一个挂机命令。每当机器人角色之上的用户使用此命令时。我得到以下错误(节点:169) 这将在稍后终止bots进程。这将进一步终止bots进程。是否有任何方法可以在不增加bot角色的情况下停止终止节点进程。这是我的命令代码 你能帮我吗?

  • 我有一个在Node开发的API。JS,打字稿听localhost:3001我有一个Angular的前端应用程序,打字稿听localhost:4200 我正在尝试使用ngx-image-cropper上传图像文件,将其转换为base 64,从前端到API。 当补丁(更新)http请求发送到API时,我得到: 访问位于'http://localhost:3001/member/10'从原点'http:

  • 问题内容: 我对Go如何处理非阻塞IO感到困惑。API在我看来基本上是同步的,并且在Go上观看演示时,听到诸如“和调用块”之类的注释并不罕见。 从文件或网络读取时,Go是否使用阻塞IO?还是当在Go Routine中使用某种魔术来重写代码? 来自C#背景,这感觉非常不直观,在C#中,当使用异步API时我们使用了关键字。这清楚地表明,API可以产生当前线程,并在以后的延续中继续。 因此,TLDR;当

  • 我一直在试验项目反应器和反应流。我在使用使流在不同的线程上运行时遇到了一个问题。将我的代码放在主线程中,我需要主线程块,直到流完成,所以我做了这样的事情: 然后我注意到有一个方法执行阻塞。但是我不能同时使用订阅和块最后,因为它们不返回。 有什么优雅的方法可以做到这一点吗?

  • JavaFX有一个问题,当我弹出一个新阶段时,该新窗口将从具有当前焦点的任何Windows应用程序中获取焦点 我希望它弹出到前面,但不对焦,所以如果用户在其他地方打字,他们可以继续打字等。 在Swing中,您可以通过以下方式解决此问题: JavaFx中似乎没有类似的选项。 下面的例子,当你点击按钮时,它将弹出一个新的阶段,获得焦点(注意,我不想要求焦点回来,因为在真正的应用程序中,当弹出发生时,用