当前位置: 首页 > 知识库问答 >
问题:

redis订阅服务器无法与redis publisher一起使用

徐承载
2023-03-14

我现在正在使用java设计redis pub/sub系统,但遇到了一个问题。我将向您展示细节:

出版商:

public class RedisMessagePublisher implements MessagePublisher {

public RedisMessagePublisher(StringRedisTemplate redisTemplate,ChannelTopic topic)
{
    this.redisTemplate = redisTemplate;
    this.topic = topic;
}

private StringRedisTemplate redisTemplate;

private ChannelTopic topic;

@Override
public void publish(String message) {
    redisTemplate.convertAndSend(topic.getTopic(), message);
    }
}

发布者是正确的,可以正确工作。

然后让我们转到subscriber类:

public class RedisMessageSubscriber implements MessageListener {

//action inspect here
private Action2<Message, byte[]> action;

public void setAction(Action2<Message, byte[]> action) {
    logger.info("action set");
    this.action = action;
}

private static Logger logger = LogManager.getLogger(RedisMessageSubscriber.class);

@Override
public void onMessage(Message message, byte[] bytes) {

    logger.info("===> redis subscribe message in <===");

    if (action != null)
        action.call(message, bytes);
    else
        logger.info("===> action is null <===");
    }
}

在订阅类中,我使用RxJava注入Action,这样我可以更容易地使用它。

但问题是,在我发布了来自publisher的消息后,我可以确认消息可以传输到onMessage方法,日志打印不是我所期望的:

===> redis subscribe message in <===
===> action is null <===

我所期望的是,当我发布一条新消息时,订阅者得到了它并运行了我创建的操作。

我用来触发下面的发布者和订阅者的服务:

@RestController("redispubsubcontroller")
@RequestMapping(value = "/redis")
public class redispubsubcontroller {

@Autowired
private RedisMessagePublisher redisMessagePublisher;

@Autowired
private RedisMessageSubscriber redisMessageSubscriber;

private static Logger logger = LogManager.getLogger(redispubsubcontroller.class);

@RequestMapping(value = "/publisher", method = {RequestMethod.GET})
public ApiResponse getConfig(String message,HttpServletRequest request,
                                            HttpServletResponse response) {

    redisMessageSubscriber.setAction(new Action2<Message, byte[]>() {
        @Override
        public void call(Message message, byte[] bytes) {
            ObjectMapper objectMapper = new ObjectMapper();
            try {
                String result = objectMapper.readValue(message.getBody(), String.class);
                logger.info("receive:"+result);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    });

    redisMessagePublisher.publish(message);

    return new ApiResponse("success","message sent");
    }
}

从上面的代码,你可以c我订阅了主题,并为订阅者设置了一个新操作:

 redisMessageSubscriber.setAction(new Action2<Message, byte[]>() {
    @Override
    public void call(Message message, byte[] bytes) {
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            String result = objectMapper.readValue(message.getBody(), String.class);
            logger.info("receive:"+result);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
});

但是我不知道为什么,在触发发布者之后,订阅者可以得到消息,但是仍然保持NULL Action,我创建的Action没有传递给它。

有人能帮忙吗?这个机制有问题吗?

====编辑=====

RedisMessageConfig代码如下:

@Configuration
public class RedisMessageConfig {

@Bean
ChannelTopic topic() {
    return new ChannelTopic("useraddresspubsub:queue");
}

@Bean
MessageListenerAdapter messageListener() {
    return new MessageListenerAdapter(new RedisMessageSubscriber());
}

@Autowired
private RedisConnectionFactory JedisConnectionFactory;

@Bean
RedisMessageListenerContainer redisContainer() {
    final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    container.setConnectionFactory(JedisConnectionFactory);
    container.addMessageListener(messageListener(), topic());
    return container;
    }
}

====解决====

最后我根据mp的想法解决了这个问题,将myredismessagesubscriber稍微更改为myredismessageconfig,因为流程是从redismessageconfig到redismessagesubscriber的,所以在redismessageconfig中,我需要首先向其注入操作,然后redismessageconfig将创建新的redismessagesubscriber并保留新创建的操作。代码如下:

@Component
public class MyRedisMessageConfig extends RedisMessageConfig {

private static Logger logger =LogManager.getLogger(MyRedisMessageConfig.class);

public MyRedisMessageConfig() {
    super.action = new Action2<Message, byte[]>() {
        @Override
        public void call(Message message, byte[] bytes) {
            String result = new String(message.getBody());
                logger.info("received:" + result);
            }
        };
    }
}

共有1个答案

劳灵均
2023-03-14

这不是MessageListener的工作方式。此外,还可以创建共享的可变状态。两次并发调用同时更改RedisMessageSubscriber的状态。

我假设在一个线程中设置操作,而在另一个线程上接收消息时,会遇到可见性问题。

如果每个MessageListener需要不同的行为,则创建实现该行为的多个侦听器。

 类似资料:
  • 问题内容: 我正在尝试使用Redis Cookbook示例: 我在这里取得了成功,但从未得到“消息”。 我的客户端index.htm是这个 客户如何发布到特定的Redis“聊天”频道。 问题答案: 如果您在node.js程序中使用Redis发布/订阅功能,则应使用一个Redis客户端连接来监听某个频道,使用另一个Redis客户端连接来发送常规命令和/或将消息发布到您的频道。从node_redis文

  • 我有一个使用ActiveMQ的JMS生产者/订阅者的简单Spring应用程序,配置如下: 我试过所有可能的解决办法,但没有一个奏效。我们非常感谢任何帮助

  • 我正在尝试使用spring-integration-kafka-2.1.0。在我公司的项目中发布。但是,由于下面列出的例外情况,它不起作用:org。springframework。信息。MessageDeliveryException:Dispatcher没有频道“org”的订户。springframework。网状物上下文WebApplicationContext:/order。“奥Kafka”

  • 我部署服务器发送事件资源与泽西2.41/Java7/tomcat 7. 我得到的错误是 2013年12月16日凌晨4:04:40组织。阿帕奇。卡塔琳娜。果心StandardWrapperValve:Servlet。路径为[/trackapp]的上下文中servlet[ServletAdapter]的服务()引发异常[java.lang.UnsupportedOperationException:s

  • 我正在使用mosquitto(http://mosquitto.org/)作为MQTT代理,并寻求关于负载平衡订阅服务器的建议(针对相同的主题)。这是如何实现的?我所读到的关于该协议的所有内容都表明,相同主题的所有订阅者都将获得一条发布消息。 这似乎效率很低,因此我正在寻找一种方法,将发布的消息以循环方式提供给连接的订阅服务器之一,以确保负载平衡状态。

  • 我正在尝试使用tika包来解析文件。Tika已成功安装,使用cmd中的代码运行 我在Jupyter中的代码是: 然而,我收到以下错误: 2018-07-25 10:20:13,325[MainThread][WARNI]无法看到启动日志消息;正在重试...2018-07-25 10:20:18,329[MainThread][WARNI]无法看到启动日志消息;正在重试...2018-07-25 1