当前位置: 首页 > 知识库问答 >
问题:

如何按类型使用来自Kafka的消息

令狐昌胤
2023-03-14

我对Kafka很陌生,对它有一些疑问。我已经配置了一个kafka消费者来消费来自主题的消息,并且我有不同类型的事件进入主题。f、 e、。

class Event {
    String type;
    Object event;
}

我想配置不同的kafka监听器来消费不同类型的事件。我认为有两种方法可以做到这一点,比如使用字符串(json)格式的事件,转换成事件对象,在不同类型之间切换,执行业务逻辑,或者配置不同的kafka监听器工厂

    public ConcurrentKafkaListenerContainerFactory<String, String> businessEventStreamListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setRecordFilterStrategy(consumerRecord -> !consumerRecord.value().contains("typeOfObjectIWantToConsume"));
        return factory;
    }

因此,第一种方法不是 SOLID,对于第二种方法,我需要创建许多工厂类。有没有办法更优雅地做到这一点?

共有1个答案

戚良弼
2023-03-14

这种业务需求是我们引入< code>@KafkaHandler的真正原因。

在文档中查看更多信息:https://docs.spring.io/spring-kafka/docs/current/reference/html/#class-level-kafkalistener.

 类似资料:
  • 我目前正在构建一个python gRPC服务器,它将大量不同的proto消息序列化为json,以将它们存储到无sql数据库中。我希望简化此服务器的扩展,这样我们就可以添加新类型,而无需重写gRPC服务器和重新部署。理想情况下,我们希望定义一条新消息,将其放入一个proto文件中,并仅更新客户端。服务器首先应该期望任何类型,但知道。进行序列化/反序列化时要查找的原型文件或文件夹。 我读过关于“任何类

  • 有人能帮我弄清楚这件事吗。 谢了!

  • 我的处理来自的消息。周期性地,按摩无法处理,消费者抛出异常。不管怎样,消费者还是会做出补偿。在Kafka中,我能区分成功消息和失败消息吗?我想,我不能。这是真的吗?如果这是真的,我有一个主要问题: 如何重试失败消息?我知道一些方法,但我不确定它们是否正确。 1) 将“偏移”更改为“提前”。但通过这种方式,成功消息也会重试。 2) 当我捕捉到异常时,我会将此消息发送到另一个主题(例如错误主题)。但这

  • 我正在开发一个模块,它使用来自Kafka主题的消息并发布到下游系统。在下游系统不可用的情况下,消费者不确认Kakfa消息。因此,当我的消费者收到消息时,当下游系统不可用时,kakfa的偏移量将不会被提交。但是如果我在下游系统启动后收到新消息,并且当我确认该消息时,最新的偏移量将被提交,并且消费者永远不会收到主题中没有偏移量提交的那些消息。

  • 2016-07-05 03:59:25.042 O.A.S.D.Executor[INFO]正在处理-2元组的接收消息:源:__System:-1,流:__Tick,ID:{},[30] 2016-07-05 03:59:25.946 O.A.S.D.Executor[INFO]正在处理-2元组的接收消息:源:__System:-1,流:__Metrics_Tick,ID:{},[60] 我的测试

  • 我创建了一个带有10个分区的Kafka主题,并尝试通过单个Kafka消费者来消费消息。但是,kafka consumer并不是从所有分区读取消息。更具体地说,它只使用来自5个特定分区的消息。示例:使用者仅使用来自[0,1,2,3,4]的消息。在重新启动之后,如果它开始使用来自[5,6,7,8,9]的消息,那么它将只使用来自这些分区的消息。下面是kafka-consumer-offset-check