当前位置: 首页 > 面试题库 >

使用同一线程Jedis订阅多个频道

暨承平
2023-03-14
问题内容

我有一个使用Redis发布/订阅在Java中使用Jedis客户端在客户端之间传输消息的应用程序。我希望能够在用户键入命令时在运行时订阅频道,但是由于订阅是一个阻塞操作,因为它在调用订阅的线程上进行侦听,所以我不确定以后如何订阅其他频道在原始线程上。

例:

private PubSubListener psl = new PubSubListener();

public void onCommand(String[] args) {
    subscribe(args[0]);
}

public void subscribe(String channel) {
    Jedis jedis = jedisPool.getResource();

    jedis.subscribe(psl, channel);
}

这将起作用,除了调度命令的线程将用于轮询Redis,而我将无法使用该线程订阅更多的频道。


问题答案:

我观察到了同样的问题,即订阅后订阅线程就会阻塞。为了解决这个问题,我使用Netty实现了一个优化的发布/
订阅客户端,并将其合并到此处的Jedis分支中。这不是一个全面的解决方案,我还没有时间真正完成它,但是它适用于基本的频道和模式订阅。

使用以下命令获取pubsub实例:

public static OptimizedPubSub getInstance(String host, int port, String auth, long timeout)

使用以下方式发布/取消模式订阅:

public ChannelFuture psubscribe(String... patterns)
public ChannelFuture punsubscribe(String... patterns)

您可以忽略返回的ChannelFuture,除非您想100%确定您的请求通过了(这是异步的)。

使用以下方式发布/取消频道订阅:

public ChannelFuture subscribe(String... channels)
public ChannelFuture unsubscribe(String... channels)

然后实现SubListener实例:

public interface SubListener {
    /**
     * Callback when a message is published on a subscribed channel
     * @param channel The channel the message was received on
     * @param message The received message
     */
    public void onChannelMessage(String channel, String message);

    /**
     * Callback when a message is published on a subscribed channel matching a subscribed pattern
     * @param pattern The pattern that the channel matched
     * @param channel The channel the message was received on
     * @param message The received message
     */
    public void onPatternMessage(String pattern, String channel, String message);
}

并使用以下方法注册/取消注册侦听器:

public void registerListener(SubListener listener)
public void unregisterListener(SubListener listener)

OptimizedPubSub永远不会阻止,并且事件会异步传递到已注册的SubListeners。

叉子现在有点旧了,所以它在当前形式下可能对您没有用,但是您可以轻松地从该包中提取源代码并独立构建它。依赖关系是Jedis和Netty。

抱歉,我没有更全面的解决方案。



 类似资料:
  • 面试问题 比如说,我们有一个在Employee表中有200万条记录的表,我们需要削减每个员工10%的工资(需要做一些处理),然后将其保存回collection。你怎样才能有效地做到这一点。 我问他,我们可以使用executor框架来创建多个线程,这些线程可以从表中获取值,然后我们可以处理并将其保存到列表中。 然后他问我,你将如何检查一个记录是否已经被处理,我不知道(如何做)。 甚至我也不确定我是否

  • 物联网有很多设备,通过订阅设备的topic可以监听物联网设备接收到的消息。 请求方式: "|4|1|2|topic|\r" 参数: topic 设置订阅的topic,获取设备topic可参考教程 返回值: "|4|1|2|1|\r" 订阅成功 "|4|1|2|2|1|\r" topic订阅达到上限(一个OBLOQ最多订阅5个topic),订阅失败 "|4|1|2|2|2|\r" topic订阅失败

  • 在我的程序中,我需要多个线程使用和编辑同一个变量,但它似乎不起作用。这是我的意思的一个例子,这将是我的主要课程。 这将是我的Thread类: 我当场编写了这段代码,所以可能会有一些错误,但没关系。我的程序基本上需要这样做,但不是每次打印数字加1,而是所有线程只需多次打印相同的数字0。请帮帮我,谢谢。

  • 所以,在上面的例子中,可以看到有2个订阅者接受相同类型的处理。现在,在post()的时候,所有的函数都将被调用吗?如果接收StartRequest的两个函数将被调用,那么它们将以哪种顺序被调用?

  • 这只是为了澄清发布/订阅线程。 我的疑问是在正常的发布者/订阅者模式中,订阅者和发布者是在同一个线程上运行还是在不同的线程中运行? 还是取决于实现? 到目前为止,我所想的是不同的订阅会有自己的线程,而publisher在其上运行的是自己的线程?

  • 在我的应用程序中,我有4个redis通道,应用程序在4个不同的线程中订阅该通道,但从长远来看,每4小时,订阅这些通道的所有4个线程都会出现异常。 JAVA网SocketException:连接超时(读取失败) 我无法找到这个超时的确切原因,如果你们的任何输入都会更有帮助的话。