我现在正在使用java设计redis pub/sub系统,但遇到了一个问题。我将向您展示细节:
出版商:
public class RedisMessagePublisher implements MessagePublisher {
public RedisMessagePublisher(StringRedisTemplate redisTemplate,ChannelTopic topic)
{
this.redisTemplate = redisTemplate;
this.topic = topic;
}
private StringRedisTemplate redisTemplate;
private ChannelTopic topic;
@Override
public void publish(String message) {
redisTemplate.convertAndSend(topic.getTopic(), message);
}
}
发布者是正确的,可以正确工作。
然后让我们转到subscriber类:
public class RedisMessageSubscriber implements MessageListener {
//action inspect here
private Action2<Message, byte[]> action;
public void setAction(Action2<Message, byte[]> action) {
logger.info("action set");
this.action = action;
}
private static Logger logger = LogManager.getLogger(RedisMessageSubscriber.class);
@Override
public void onMessage(Message message, byte[] bytes) {
logger.info("===> redis subscribe message in <===");
if (action != null)
action.call(message, bytes);
else
logger.info("===> action is null <===");
}
}
在订阅类中,我使用RxJava注入Action,这样我可以更容易地使用它。
但问题是,在我发布了来自publisher的消息后,我可以确认消息可以传输到onMessage方法,日志打印不是我所期望的:
===> redis subscribe message in <===
===> action is null <===
我所期望的是,当我发布一条新消息时,订阅者得到了它并运行了我创建的操作。
我用来触发下面的发布者和订阅者的服务:
@RestController("redispubsubcontroller")
@RequestMapping(value = "/redis")
public class redispubsubcontroller {
@Autowired
private RedisMessagePublisher redisMessagePublisher;
@Autowired
private RedisMessageSubscriber redisMessageSubscriber;
private static Logger logger = LogManager.getLogger(redispubsubcontroller.class);
@RequestMapping(value = "/publisher", method = {RequestMethod.GET})
public ApiResponse getConfig(String message,HttpServletRequest request,
HttpServletResponse response) {
redisMessageSubscriber.setAction(new Action2<Message, byte[]>() {
@Override
public void call(Message message, byte[] bytes) {
ObjectMapper objectMapper = new ObjectMapper();
try {
String result = objectMapper.readValue(message.getBody(), String.class);
logger.info("receive:"+result);
} catch (IOException e) {
e.printStackTrace();
}
}
});
redisMessagePublisher.publish(message);
return new ApiResponse("success","message sent");
}
}
从上面的代码,你可以c我订阅了主题,并为订阅者设置了一个新操作:
redisMessageSubscriber.setAction(new Action2<Message, byte[]>() {
@Override
public void call(Message message, byte[] bytes) {
ObjectMapper objectMapper = new ObjectMapper();
try {
String result = objectMapper.readValue(message.getBody(), String.class);
logger.info("receive:"+result);
} catch (IOException e) {
e.printStackTrace();
}
}
});
但是我不知道为什么,在触发发布者之后,订阅者可以得到消息,但是仍然保持NULL Action,我创建的Action没有传递给它。
有人能帮忙吗?这个机制有问题吗?
====编辑=====
RedisMessageConfig代码如下:
@Configuration
public class RedisMessageConfig {
@Bean
ChannelTopic topic() {
return new ChannelTopic("useraddresspubsub:queue");
}
@Bean
MessageListenerAdapter messageListener() {
return new MessageListenerAdapter(new RedisMessageSubscriber());
}
@Autowired
private RedisConnectionFactory JedisConnectionFactory;
@Bean
RedisMessageListenerContainer redisContainer() {
final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(JedisConnectionFactory);
container.addMessageListener(messageListener(), topic());
return container;
}
}
====解决====
最后我根据mp的想法解决了这个问题,将myredismessagesubscriber稍微更改为myredismessageconfig,因为流程是从redismessageconfig到redismessagesubscriber的,所以在redismessageconfig中,我需要首先向其注入操作,然后redismessageconfig将创建新的redismessagesubscriber并保留新创建的操作。代码如下:
@Component
public class MyRedisMessageConfig extends RedisMessageConfig {
private static Logger logger =LogManager.getLogger(MyRedisMessageConfig.class);
public MyRedisMessageConfig() {
super.action = new Action2<Message, byte[]>() {
@Override
public void call(Message message, byte[] bytes) {
String result = new String(message.getBody());
logger.info("received:" + result);
}
};
}
}
这不是MessageListener
的工作方式。此外,还可以创建共享的可变状态。两次并发调用同时更改RedisMessageSubscriber
的状态。
我假设在一个线程中设置操作
,而在另一个线程上接收消息时,会遇到可见性问题。
如果每个MessageListener
需要不同的行为,则创建实现该行为的多个侦听器。
问题内容: 我正在尝试使用Redis Cookbook示例: 我在这里取得了成功,但从未得到“消息”。 我的客户端index.htm是这个 客户如何发布到特定的Redis“聊天”频道。 问题答案: 如果您在node.js程序中使用Redis发布/订阅功能,则应使用一个Redis客户端连接来监听某个频道,使用另一个Redis客户端连接来发送常规命令和/或将消息发布到您的频道。从node_redis文
我有一个使用ActiveMQ的JMS生产者/订阅者的简单Spring应用程序,配置如下: 我试过所有可能的解决办法,但没有一个奏效。我们非常感谢任何帮助
我正在尝试使用spring-integration-kafka-2.1.0。在我公司的项目中发布。但是,由于下面列出的例外情况,它不起作用:org。springframework。信息。MessageDeliveryException:Dispatcher没有频道“org”的订户。springframework。网状物上下文WebApplicationContext:/order。“奥Kafka”
我部署服务器发送事件资源与泽西2.41/Java7/tomcat 7. 我得到的错误是 2013年12月16日凌晨4:04:40组织。阿帕奇。卡塔琳娜。果心StandardWrapperValve:Servlet。路径为[/trackapp]的上下文中servlet[ServletAdapter]的服务()引发异常[java.lang.UnsupportedOperationException:s
我正在使用mosquitto(http://mosquitto.org/)作为MQTT代理,并寻求关于负载平衡订阅服务器的建议(针对相同的主题)。这是如何实现的?我所读到的关于该协议的所有内容都表明,相同主题的所有订阅者都将获得一条发布消息。 这似乎效率很低,因此我正在寻找一种方法,将发布的消息以循环方式提供给连接的订阅服务器之一,以确保负载平衡状态。
我正在尝试使用tika包来解析文件。Tika已成功安装,使用cmd中的代码运行 我在Jupyter中的代码是: 然而,我收到以下错误: 2018-07-25 10:20:13,325[MainThread][WARNI]无法看到启动日志消息;正在重试...2018-07-25 10:20:18,329[MainThread][WARNI]无法看到启动日志消息;正在重试...2018-07-25 1