目前,我让生产者和消费者在同一个Spring Boot应用程序中进行配置,但触发消息的Spring云流没有通过Kafka(我正在使用Kafka控制台消费者监控消息),但消费者仍然收到消息(使用与生产者相同的线程)。
如果我在应用程序中删除consumerHandler(@StreamListener),制作人就会成功地将消息发送给Kafka。
这个有什么配置吗?默认情况下,我需要将Spring云流消息发送到Kafka。
生产者和消费者配置:
@Component
public interface NotificationProcessor {
String EMAIL_NOTIFICATION = "email-notification";
@Input(EMAIL_NOTIFICATION)
SubscribableChannel receiveEmail();
@Output(EMAIL_NOTIFICATION)
MessageChannel sendEmail();
}
以下是我的一些配置:
spring:
cloud:
stream:
kafka:
binder:
autoAddPartitions: true
brokers: ${KAFKA_BROKERS:localhost:9092}
auto-create-topics: true
configuration:
auto.offset.reset: latest
bindings:
email-notification:
group: ${EMAIL_GROUP:email-group-notification}
destination: ${EMAIL_TOPIC:email-notification}
contentType: application/json
producer:
partitionCount: 9
consumer:
partitioned: true
concurrency: 3
instance-count: 1
instance-index: 0
触发发送消息的API:
@RestController
@RequestMapping("/api")
public class TestResource {
private final Logger log = LoggerFactory.getLogger(TestResource.class);
private final NotificationProcessor notificationProcessor;
public TestResource(NotificationProcessor notificationProcessor) {
this.notificationProcessor = notificationProcessor;
}
@ApiOperation(value = "Test api")
@GetMapping(value = "/send-email2", produces = APPLICATION_JSON_VALUE)
public ResponseEntity<Boolean> test2() {
EmailMessage test = EmailMessage.builder()
.to(Arrays.asList(Receiver.builder().email("test@nomail.com").build())
).type(EContentType.JSON)
.build();
log.info("send email message to kafka");
notificationProcessor.sendEmail().send(MessageBuilder.withPayload(test).build());
return ResponseEntity.ok(Boolean.TRUE);
}
}
消费者处理者:
@EnableBinding(NotificationProcessor.class)
public class NotificationProducer {
private final Logger log = LoggerFactory.getLogger(NotificationProducer.class);
public NotificationProducer(){}
@StreamListener(NotificationProcessor.EMAIL_NOTIFICATION)
public void receiveEmail(@Payload Message<EmailMessage> message) {
log.info("Receive email message from kafka");
EmailMessage emailMessage = message.getPayload();
}
}
从提供的信息中不清楚您要将信息发送到哪里。什么频道?默认情况下,通道是内部和直接的,因此,如果您发送到订阅的同一个通道,您将完全绕过message broker(即Kafka)。这可以解释这两种症状(没有代理和相同的html" target="_blank">线程)。
也就是说,基于注释的配置模型已被弃用。在过去的几年里,我们完全迁移到函数式编程模型,这要简单得多,并且旨在帮助您不要考虑内部实现,例如通道,因为它们实际上是内部使用的(代码和代理适配器之间的桥梁)。
还有一个新组件,它允许您向专门为您所拥有的场景设计的代理发送消息——StreamBridge。
无论如何,看看它并考虑重构你的应用程序。至少要确保发送到绑定到代理目标的通道,并订阅绑定到同一目标的另一个通道,从而确保到代理的往返。
最后但并非最不重要的一点是,我仍然很困惑,为什么你需要发送给代理,然后在同一个应用程序中订阅它?为什么要增加网络开销?
这是我的消费者: 所以当运行我的制作人时,它最终会出错。任何人都知道这意味着什么,如果这可能是错的。
我使用的是Spring Cloud Stream和RabbitMQ,我在同一个应用程序中配置了生产者和消费者,但使用了两个不同的通道。问题是,我的应用程序有五个副本在Kubernetes上运行,但只有发送消息的pod会使用它们,其他四个不会使用。在RabbitMQ控制台上,没有收到任何消息,消息速率图不会更改。当我注释OrdersListener时,消息会成功发送到RabbitMQ集群。有人知道可
在这种情况下,我是否需要求助于Kafka事务API来在消费者轮询循环中创建事务生产者,在该循环中,我在事务中执行:(1)处理消耗的记录和(2)在关闭事务之前提交它们的偏移量。在这种情况下,普通的commitsync/commitasync是否有效?
本教程演示了如何发送和接收来自Spring Kafka的消息。 首先创建一个能够发送消息给Kafka主题的Spring Kafka Producer。 接下来,我们创建一个Spring Kafka Consumer,它可以收听发送给Kafka主题的消息。使用适当的键/值序列化器和解串器来配置它们。 最后用一个简单的Spring Boot应用程序演示应用程序。 下载并安装Apache Kafka 要
我目前正在开发Kafka模块,我正在使用Kafka通信的抽象。我能够集成生产者 Spring Boot测试类 监听器类 我的问题是:在测试类中,我断言分区、有效负载等是从BlockingQueue轮询的,然而,我的问题是如何验证用KafkaListener注释的类中的业务逻辑是否得到正确执行,并根据错误处理和其他业务场景将消息路由到不同的主题。在一些示例中,我看到了CountDownLatch的断
向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前,使用Ctrl-C关闭zookeeper和kafka服务(这是通过在consumer方法中使用来模拟的)。 发现 在zookeeper和kafka服务被关闭后,消费者继续在控制台上写消息。 问题 我如何使消费者从上次消费的消息的索引+1继续。 向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前