当前位置: 首页 > 知识库问答 >
问题:

春云流Kafka消费测试

华善
2023-03-14

我正试图按照GitHub的建议设置测试

        Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
    DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
    try {
        KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
        template.setDefaultTopic("words");
        template.sendDefault("foobar");

    --> ConsumerRecord<String, String> cr = KafkaTestUtils.getSingleRecord(consumer, "output");
        log.debug(cr);
    }
    finally {
        pf.destroy();
    }

其中StreamProcessor设置为

@StreamListener
    @SendTo("output")
    public KStream<?, WordCount> process(@Input("input") KStream<Object, String> input) {

        return input.map((key, value) -> new KeyValue<>(value, new WordCount(value, 10, new Date(), new Date())));
    }

-->line从不使用在我看来应该在主题“output”上的消息,因为@StreamProcessor有@Sendto(“output”)

  • 我希望能够测试流处理的消息。

共有1个答案

颜欣怡
2023-03-14

您需要使用输出绑定到的实际主题。您有spring.cloud.stream.bindings.output.destination的配置吗?这应该是您需要使用的值。如果不设置,默认值将与绑定相同-在本例中输出

 类似资料:
  • 我的使用者绑定到匿名使用者组,而不是我指定的使用者组。 我的春靴应用 我的输入输出通道接口 我的控制台日志-- :在3.233秒内启动ConsumerApplication(JVM运行于4.004):[使用者Clientid=Consumer-3,Groupid=Anonymous.0D0C87D6-EF39-4BFE-B475-4491C40CAF6D]发现组协调器Singh:9092(ID:2

  • 我试图改变生产者和消费者配置的顺序,但没有帮助。 编辑:我已经添加了完整的application.yml。当我第一次引导服务时,这个主题在Kafka中是不存在的。 它感觉在生产者和消费者配置之间有冲突,我认为它说有3个分区的原因是消费者中的并发性是3,所以它首先创建有3个分区的主题,然后当它移动到生产者配置时,它不调整分区计数。

  • 我用的是Apache Kafka 2.7.0和Spring Cloud Stream Kafka Streams。 在我的Spring Cloud Stream (Kafka Streams)应用程序中,我已经将我的application.yml配置为当输入主题中的消息出现反序列化错误时使用sendToDlq机制: 我启动了我的应用程序,但我看不到这个主题存在。文档指出,如果 DLQ 主题不存在,

  • Spring Cloud Kafka Streams与Spring Cloud Stream、Spring Cloud Function、Spring AMQP和Spring for Apache Kafka有什么区别?

  • 我有一个常见的任务问题,我可以找到任何解决方案或帮助(也许我需要传递一些属性来工作?)我使用本地服务器1.3.0.M2并创建简单的流 在日志中,我得到了这个: 2017-09-28 12:31:00.644 信息 5156 --- [ -C-1] o.. a.k.c.c.internals.AbstractCoordinator : 成功加入第 1 代的组测试 2017-09-28 12:31:0

  • 我最近开始学习Kafka,最后就问了这些问题。 > 消费者和流的区别是什么?对我来说,如果任何工具/应用程序消费来自Kafka的消息,那么它就是Kafka世界中的消费者。 流与Kafka有何不同?为什么需要它,因为我们可以使用消费者API编写自己的消费者应用程序,并根据需要处理它们,或者将它们从消费者应用程序发送到Spark? 我做了谷歌对此,但没有得到任何好的答案。抱歉,如果这个问题太琐碎了。