当前位置: 首页 > 知识库问答 >
问题:

Spring集成JavaDSL-Kafka:如何使用Kafka支持将ProducerListener添加到ProducerConfiguration?

竺翰海
2023-03-14

我正在开发一个spring集成应用程序,它有一个Kafka出站通道适配器,并使用spring集成java dsl配置流。

  • spring集成核心:4.2.4。释放
  • spring集成Kafka:1.3.0。释放
  • spring集成java dsl:1.1.2。发布

我已经配置了类似于以下代码段的消息处理程序规范。

            KafkaProducerMessageHandlerSpec messageHandlerSpec = Kafka
                .outboundChannelAdapter()
                .addProducer(new ProducerMetadata<String, byte[]>(topicName, String.class, byte[].class,
                        new StringSerializer(), new ByteArraySerializer()), "localhost:9092");

我想添加一个ProducerListener。此功能已添加到此处的Spring集成kafka中。https://github.com/spring-projects/spring-integration-kafka/pull/80

您能为我提供使用Java DSL添加ProducerListener的适当机制吗。

谢谢

共有1个答案

彭涵衍
2023-03-14

嗯,这听起来像是对SI Kafka 1.3的SIJavaDSL的一个很好的改进。现在我们有一个与SI Kafka 1.2. x的兼容性。

与此同时,我们将尝试找出解决方案(请随时提出适当的GH问题),这里有一个解决方法:

KafkaProducerMessageHandlerSpec messageHandlerSpec = Kafka
            .outboundChannelAdapter()
            .addProducer(new ProducerMetadata<String, byte[]>(topicName, String.class, byte[].class,
                    new StringSerializer(), new ByteArraySerializer()), "localhost:9092");

KafkaProducerContext kafkaProducerContext = (KafkaProducerContext) messageHandlerSpec.getComponentsToRegister().iterator().next();
ProducerConfiguration<?, ?> producerConfiguration = kafkaProducerContext.getTopicConfiguration(topicName);
producerConfiguration.setProducerListener(myProducerListener);
 类似资料:
  • 我目前正在尝试编写一个适配器,它将使用来自ActiveMQ的消息并将其发布到Kafka。 我正在考虑使用Spring集成来集成这两个消息传递系统。 我的问题是,我的应用程序不会维护模型的注册表,许多应用程序将使用该注册表将记录发布到activeMQ。我想接收这些javax-jms消息,并想执行一些转换,比如将jmscorrelationId添加到kafka消息中。 另外,另一个要求是仅当kafka

  • 我的pom.xml是: 我错过了什么?

  • 我试着把我的头缠绕在Kafka的溪流和一些根本的问题,我似乎无法解决,我自己。我理解和Kafka状态存储的概念,但我很难决定如何实现它。我还在使用Spring Cloud Streams,这在此基础上增加了另一个层次的复杂性。 我的用例: 一些有状态规则如下所示: 我当前的实现将所有这些信息存储在内存中,以便能够执行分析。由于显而易见的原因,它不容易扩展。所以我想我会坚持到Kafka的州立商店。

  • 当使用int-kafka: out站通道适配器生成到kafka时,似乎没有可用的错误通道。在这种情况下,如何处理reties次后无法向kafka生成消息? 任何可能导致kafka失败的错误。(以下代码只是来自Internet的代码片段,只是想知道如何向其添加错误句柄)

  • 我用的是Kafka。请在下面找到测试程序。 我正在使用Storm 0.8.1。在Storm 0.8.2中存在多方案类。我会用那个。我只想知道早期版本是如何通过实例化String计划()类来工作的?我在哪里可以下载早期版本的Kafka喷口?但是我怀疑这是一个正确的选择,而不是在Storm 0.8.2上工作。???(困惑) 当我在暴风集群上运行代码(如下所示)时(即当我推我的拓扑时),我得到以下错误(

  • 我有一个spring boot应用程序,它使用spring boot V2.1.9.发布版本。我只想升级SpringKafka。但是升级后我没有发现这样的方法错误。 我试图将spring boot升级到2.2.0,但由于一些其他插件,如swagger不支持。