当前位置: 首页 > 知识库问答 >
问题:

如何从apache nifi在kafka主题中产生Avro消息,然后使用kafka流阅读它?

屠嘉
2023-03-14

我想使用apache nifi将一些通用数据生成到kafka主题中,并且我希望这些数据是avro格式的。我为它所做的:

  1. 在架构注册表中创建新架构:

{“type”:“record”,“name”:“my_schema”,“namespace”:“my_namespace”,“doc”:“”,“fields”:[{“name”:“key”,“type”:“int”},{“name”:“value”,“type”:[“null”,“int”]},{“name”:“event_time”,“type”:“long”}]}

然后我试着用Kafka的Streams来读它:

公共类测试{private final static Logger Logger=Logger.getlogger(kafkafilterusingcacheavro.class);

public static void main(String[] args) {
    Properties properties = new Properties();

    properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");
    properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "app");
    properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
    properties.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "registry:8081");

    StreamsBuilder builder = new StreamsBuilder();
    KStream<String, GenericRecord> source = builder.stream("topic");
    source.foreach((k, v) -> logger.info(String.format("[%s]: %s", k, v.toString())));

    Topology topology = builder.build();
    KafkaStreams streams = new KafkaStreams(topology, properties);
    streams.start();
}

}

GenericAvroSerde-https://html" target="_blank">github.com/johnreedlol/kafka-streams/blob/master/src/main/Java/io/confluent/examples/streams/utils/GenericAvroSerde.Java

结果我得到了错误:

原因:org.apache.kafka.Common.Errors.SerializationException:反序列化id-1的Avro消息时出错,原因:org.apache.kafka.Common.Errors.SerializationException:未知的魔术字节!

我还尝试在AvroReader\Writer中显式设置avro模式,但没有帮助。另外,如果我尝试简单地从topic读取字节并将其转换为字符串表示,我会得到如下结果:

objavro.schema{“type”:“record”,“name”:“my_schema”,“namespace”:“my_namespace”,“doc”:“”,“fields”:[{“name”:“key”,“type”:“int”},{“name”:“value”,“type”:[“null”,“int”]},{“name”:“event_time”,“type”:“long”}]}avro.codecsnappy[4]m[©Q:àg0]“/}1/2{[4]m[©Q:àg0

我该怎么修好它?

共有1个答案

洪国兴
2023-03-14

在PublishKafka处理器中,Avro writer配置有“Embedded Avro Schema”的“Schema Write Strategy”。这意味着写入Kafka的消息是嵌入了完整模式的标准Avro消息。

在您的消费者端(Kafka流)上,它似乎希望使用汇合模式注册表,在这种情况下,它希望的不是嵌入式Avro模式,而是指定模式id的特殊字节序列,然后是裸Avro消息。

假设您希望保持您的使用者原样,那么在NiFi方面,您将希望将Avro Writer的“模式编写策略”更改为“合流模式注册表引用”。我认为这可能还需要您更改Avro reader以使用合流模式注册服务访问模式。

或者,也许有一种方法可以使Kafka流读取嵌入式模式,而不使用汇流模式注册表,但是我以前没有使用过Kafka流,所以我不能说这是否可行。

 类似资料:
  • Debezium连接器的Kafka connect事件是Avro编码的。 在传递给Kafka connect standalone服务的connect-standalone.properties中提到了以下内容。 使用这些属性配置Kafka使用者代码: 在消费者实现中,下面是读取键和值组件的代码。我使用REST从模式注册表中获取键和值的模式。 解析密钥工作正常。在解析消息的值部分时,我得到了Arr

  • 我以前是学习Kafka的传统ActiveMQ用户。我有一个问题。 使用Active MQ,您可以执行以下操作: 将100条消息提交到队列中 我试着在Kafka做同样的事情 如果不启动Consumer,等待它启动,然后运行producer,则此示例不起作用。 谁能告诉我如何修改我的示例程序,以在消息等待被消费的地方执行操作?

  • 我有以下用例: 我有两个Kafka主题,一个是用来处理传入消息流的,另一个是用来存储记录的,作为应用程序初始状态的引导。 有没有办法做到以下几点: 当应用程序启动时,读取Kafka主题中的所有消息,并将该主题中用于将应用程序引导至初始状态的所有存储在内存中 只有在读取了所有消息后,才允许处理流主题中的 因为在应用程序运行时,状态主题上可能会有其他记录,以便在不必重新启动应用程序的情况下将它们合并到

  • 因为我是新的Kafka,所以我能够从文件中读取记录,并通过生产者将消息发送到Kafka主题,但不能通过消费者消费相同的主题。 注意:您可以从任何文本文件中读取数据,我使用的是Kafka2.11-0.9。0.0版本 这是我的密码: 下面是输出:

  • 我有一个简单的java制作人,如下所示 我正在尝试读取如下数据 但消费者并没有从Kafka那里读到任何信息。如果我在处添加以下内容 然后消费者开始从题目开始阅读。但是每次消费者重新启动时,它都从我不想要的主题开始读取消息。如果我在启动消费程序时添加了以下配置 然后,它从主题中读取消息,但是如果消费者在处理所有消息之前重新启动,那么它不会读取未处理的消息。 有人可以让我知道出了什么问题,我该如何解决