我有一个向rabbitmq发送消息的服务,消费者对消息进行一些操作并重新排队。
我可以成功地将初始消息发送给rabbitmq,但问题是,如果消息需要修改,我无法将任何已使用的消息重新发送给rabbitmq。
@Service
public class MyService {
/**
* The template
*/
@Autowired
private AmqpTemplate amqpTemplate;
private final RabbitMQConfig config;
public void send(String message) {
try {
amqpTemplate.convertAndSend("ex", "r", message);
}
catch (Exception e) {
e.printStackTrace();
}
}
}
@Bean
public Queue myQueue() { return new Queue("my-queue");
// etc...
@Bean
MessageListenerAdapter myListenerAdapter(MyListener listener) {
return new MessageListenerAdapter(listener, "listener");
}
@Bean
MyListener myListener() {
return new MyListener();
}
public class MyListener {
public void receiveMessage(String message) {
// ... some code
// if message requires modification, then repush
new Repush().push(message);
}
}
我试图用new创建一个新类,但“MyService”始终为空
@Component
public class Repush {
@Autowired
private MyService myService;
public void push(String message) {
// myService is null at this point
}
}
不要使用new
创建bean。Spring只在豆类中注入田地。您的myListener
是一个bean。只需在该类中添加带有@autowired
批注的repush
字段。
public class MyListener {
@Autowired
private Repush repush;
public void receiveMessage(String message) {
// ... some code
// if message requires modification, then repush
repush.push(message);
}
}
D: \软件\Kafka\Kafka2.10-0.10.0.1\bin\windows 我使用上面的命令来消费消息,有什么我错过的吗?帮助我: 这个 那些是生产者和消费者......
生产者发送消息到一个有四个分区的主题。我们有一个消费者在消费来自这个主题的消息。应用程序在工作日一直运行周末例外:它不会在周末期间调用poll方法。 使用者配置:自动提交,自动提交时间为5s(默认)。 应用程序一直运行良好,直到一个星期天,当它重新开始调用poll方法。我们看到有数百万条消息从这个话题中被轮询出来。消费者基本上是轮询来自主题的所有消息。将新的偏移量与它在周末停止之前的偏移量进行比较
我有一个Kafka主题,目前有3个分区。我希望我的消费者从同一个分区读取,但每条消息都应该以循环方式发送给不同的消费者。有可能实现吗?
我目前正在使用带有的Kafka绑定器的Spring Cloud Stream为我的Spring Boot微服务执行消息记录。 我有: 生产者将消息发布到订阅频道 在消息从生产者发布到流并被消费者收听的整个过程中,可以观察到preSend方法被触发了两次: 一次在生产者端-消息发布到流时 然而,出于日志记录的目的,我只需要在消费者端截获并记录消息。 是否有任何方法可以仅在一侧(例如消费者侧)截获SC
我在mac上运行Kafka和Flink作为docker容器。 我已经实现了Flink作业,它应该消耗来自Kafka主题的消息。我运行一个向主题发送消息的python生产者。 工作开始时没有问题,但没有收到任何消息。我相信这些消息被发送到了正确的主题,因为我有一个能够使用消息的python消费者。 flink作业(java): Flink作业日志: 生产者作业(python):(在主机上运行-不是d
我在ActiveMQ中使用异步消息使用者。我的制作人工作正常,向队列发送消息。现在,我的异步消息消费者正在等待调用onMessage(),但这从未发生过。因此,问题是: 异步使用者不会使用消息 ActiveMQ日志的快照还显示了许多刚刚堆积在挂起状态中的消息: 我想不出问题到底出在哪里。 计数: toPageIn 78 只是不断增加,信息仍然无法传递给消费者。 是服务器端问题还是客户端问题?