当前位置: 首页 > 知识库问答 >
问题:

spring cloud stream kafka-功能性方法:在生成墓碑记录时从不调用消费者

夏俊人
2023-03-14

问题与https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/455但对于功能性方法

    @Bean
    public Consumer<Message<Foo>> fooConsumer() {
        return message -> {
            String key = // From message header - KafkaHeaders.RECEIVED_MESSAGE_KEY
            Foo payLoad = message.getPayload();

            if (payLoad == null) {
                deleteSomething(key);
            } else {
                addSomething(key, payLoad);
            }
        };
    }

当产生适当的“Foo”(json)时,会调用fo消费者。但是当产生“null”有效负载/墓碑时,永远不会调用消费者函数

我尝试使用自定义“Convertor”方法的变通方法,该方法正在发挥作用:

@Component
public class KafkaNullConverter extends MappingJackson2MessageConverter {

    @Override
    protected Object convertFromInternal(Message<?> message, @NonNull Class<?> targetClass, Object conversionHint) {
        if (message.getPayload() instanceof KafkaNull) {
            return Foo.builder().isDeleted(true).build(); // Note: new `isDeleted` field added to `Foo`
        }

        return super.convertFromInternal(message, targetClass, conversionHint);
    }
}

然后,食品消费者可以检查有效载荷。isDeleted()来处理大小写。但这是冗长的,会污染pojo/模型类,并且必须对每个消费者重复。

我理解spring集成不能使用null。但是,有没有更好的/标准的方法来用功能方法处理“墓碑”用例?

版本:3.0.4。发布

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-stream-binder-kafka</artifactId>
   <version>3.0.4.RELEASE</version>
</dependency>

共有1个答案

夹谷山
2023-03-14

这是最近固定的主要分支https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/pull/936和https://github.com/spring-cloud/spring-cloud-function/issues/557

 类似资料:
  • 这是创建ListenerContainerFactory的类 这是我用@KafKalistener注释的Listener类 这是KafkaListenerConfig类,它接受引导服务器、主题名称等。

  • 我用以下属性创建了一个Kafka主题 min.cleanable.dirty.ratio=0.01,delete.retention.ms=100,segment.ms=100,cleanup.policy=紧凑 假设我按1111:1,1111:2,1111: null,2222:1的顺序插入k-v对,现在除了最后一条消息,日志压缩在其余消息上运行并清除前两条消息,但保留1111: null 根据

  • 我已经构建了一个DockerLinux镜像,用于虚幻引擎的持续集成构建,具体方法如下:https://docs.adamrehn.com/ue4-docker/use-cases/continuous-integration 一旦构建完成(这需要一个具有大量磁盘空间的在线Linux虚拟机),我将该映像导出,下载到我的Windows 10计算机上,并尝试将该映像导入我的本地版本Docker中进行测试

  • 我的应用程序由一个带有POST方法的REST控制器组成,用于提交我必须使用生产者发送到主题的数据。 这是控制器 使用Spring-Cloud-Stream版本 从3.1版开始,和注释被弃用,所以我尝试切换到新的方式来设置生产者,我就是这样工作的 最后在应用程序中。yaml我有这个 现在的问题是,当我启动应用程序时,方法被无限调用(我在主题中看到消息)。然后使用供应商似乎我被迫在供应商内部定义消息数

  • 我的结构是这样的:日志文件 但我卡在Kafka到Logstash部分。 首先,Filebeat可以向Kafka生成消息,我可以使用以下方式检查它: 也可以由命令使用: 但是,当我尝试使用logstash来消费主题时,没有任何东西可以被检索到,Zoomaster一直在抛出: 2017-11-13 16:11:59205[myid:]-信息[NIOServerCxn.工厂:0.0.0.0/0.0.0.

  • 我们正在使用Kafka流将数据写入接收器主题。我正在运行一个avro消费者命令行来检查接收器主题中是否有数据: bin/kafka-avro控制台-消费者-主题sink.output.topic-从开始-新消费者-引导-服务器 当我在kafka streams应用程序运行时同时运行消费者时,我会看到数据,但如果我停止消费者并在几分钟后再次运行,我不会看到任何数据。几乎没有可能: 1) 这是因为Ka