当前位置: 首页 > 知识库问答 >
问题:

使用avro格式的apache kafka消息

索卓
2023-03-14

我想使用spring-Kafka库使用spring boot配置的消费者来使用来自Kafka代理的消息,源是一个JDBC连接器,它负责从MySQL数据库提取消息,这些消息需要被使用

下面是我的application.yml文件

server:
  port: 9000
spring:
  kafka:
    consumer:
      bootstrap-servers: localhost:9092
      group-id: group_id
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
  @KafkaListener(topics = "Sample-Topic", groupId = "group_id")
    public void consume(String message) throws IOException {
        System.out.println(message);
    }

共有1个答案

邓鸿彩
2023-03-14

您需要使用avro反序列化器而不是StringDeserializer

 类似资料:
  • 我想使用Avro来序列化我的Kafka消息的数据,并想将其与Avro模式存储库一起使用,这样我就不必将模式包含在每条消息中。 将Avro与Kafka结合使用似乎是一件很流行的事情,许多博客/堆栈溢出问题/用户组等都提到了将模式Id与消息一起发送,但我找不到一个实际的示例来说明它应该去哪里。 我想它应该放在Kafka消息头的某个地方,但我找不到一个明显的地方。如果它在Avro消息中,则必须根据模式对

  • 我使用kafka流来消耗来自一个主题的JSON字符串,处理并生成存储在另一个主题中的响应。然而,需要对响应主题产生的消息需要采用avro格式。 我已经尝试使用键作为字符串序列和值作为规范AvroSerde 以下是我创建拓扑的代码: 以下是我的配置 当我尝试使用该示例时,我看到了以下错误:

  • 因此,我们计划使用Avro在融合的Kafka生态系统上进行交流。我目前对Avro的理解是,每条消息都有自己的模式。如果是这样的话,我们需要模式注册表来解决版本更新吗? 我问,因为在每条消息中携带模式可以防止需要像模式注册表这样的东西来将消息ID映射到模式。还是我在这里错过了什么?

  • 我用本地安装的Confluent4.0.0尝试了官方模式-注册表-汇合示例(Consumer/Producer),它可以在发送post请求和在listener接收时发送“Sensor”avro消息,但当我使用Confluent4.0.0附带的kafka-avro-console-consumer工具查看发送的avro消息时,该工具引发了以下错误(a)。我还尝试使用kafka-avro-consol

  • 我已经创建了要将它们连接在一起的kstream。两个流的输出如下所示: 流1: 流2: 我想创建这两个Stream的连接流(内连接),所以我创建了以下KStream: 在这个KStream中,我只使用了一个连接,我正在更改输出消息的格式,仅此而已。 通过一个例子,我将解释我想做什么: 在窗口内发布以下消息: 流1 流2 加入流 出版的是什么 我想出版什么 总之,我只想在窗口中发布最新消息,而不是所

  • 问题内容: 我已经在资源束中存储了一些消息。我正在尝试按以下格式设置这些消息。 假设第一个参数(即实际消息)存储在以某种方式检索到的属性文件中。 第二个参数(即5)是一个动态值,应放置在不会发生的占位符中。下一行打印, 您即将删除{0}行。 占位符不会替换为实际参数。 这里是撇号- 。我试图像往常一样逃避它,尽管它没有用。需要进行哪些更改才能使其正常工作? 问题答案: 向模式添加多余的撇号以确保显