我正在尝试用kafka绑定构建一个简单的云流应用程序。让我描述一下设置。1、我有一位制作人正在制作主题1
2。有一个流活页夹,经过一些处理后将主题1绑定到主题2。
@StreamListener(MyBinder.INPUT)
@SendTo(MyBinder.OUTPUT_2)
public String handleIncomingMsgs(String s) {
logger.info(s); // prints all the messages
return s;
}
java prettyprint-override">@Service
@EnableBinding(MyBinder.class)
public class LogMsg {
@StreamListener(MyBinder.OUTPUT_2)
public void handle(String board) {
logger.info("Received payload: " + board); //prints every alternate messages
}
public interface ViewsStreams {
String INPUT = "input";
String OUTPUT_1 = "output_1";
String OP_USERS = "output_2";
@Autowired
@Input(INPUT)
SubscribableChannel job_board_views();
@Autowired
@Output(OUTPUT_1)
MessageChannel outboundJobBoards();
@Autowired
@Output(OUTPUT_2)
MessageChannel outboundUsers();
}
我是这些技术的新手。无法弄清楚这里出了什么问题。有人能帮忙吗?
你的猜测是正确的;OUTPUT\u 2通道上有两个使用者—侦听器和发送消息的绑定。
他们每个人都会收到备用消息。
我不知道是怎么回事,我的java客户机消费者用@KafkaListener注释后没有收到任何消息。当我通过命令行创建消费者时,它可以工作。同样,Producer也能按预期工作(同样在java中)。有人能帮我理解这种行为吗? application.yml 生产者配置: 消费者配置: 制作人 Spring控制器: 这是我的控制台输出,正如您所看到的,它发送一条消息,但该方法不接收任何内容。如果我没有
我正在使用以下在docker上运行kafka、zookeeper和kafdrop: 我有一个具有以下配置的Spring Boot Producer应用程序-: 在我的中,我有以下内容: 这是一个单独的应用程序,我在我的服务中这样称呼Kafka制作人: 在一个完全不同的spring引导应用程序中,我有一个像这样的使用者: 我可以看到消费者正在连接到代理,但是有消息的日志。下面是我能看到的完整日志:
我可以在命令行上针对Kafka位置安装发送和接收消息。我也可以通过Java代码发送消息。这些消息显示在Kafka命令提示符中。我还有一个Kafka消费者的Java代码。代码昨天收到了消息。但是今天早上没有收到任何消息。代码没有更改。我想知道属性配置是否不太正确。这是我的配置: 制片人: 生产记录设置为 消费者: 对于Java代码: 少了什么?
当监听设备后,会返回接收到的消息数据。 请求方式: 无 返回值: "|4|2|5|message|" 返回接收到的消息 参数 message 返回的消息内容
Spring Kafka批处理消费者只能收到一两条消息,我们已经将fetch.min.bytes增加到9000 即使在增加值后,我们只收到1或2条消息,我们是否还需要增加fetch.min.bytes和fetch.max.wait.ms的值,或者我们是否需要添加任何其他配置,或者我们是否需要减少最大轮询记录大小?在本地环境中,我们收到10条消息,但在AWS MSK集群中,我们收到1或2条消息 消费