Debezium连接器的Kafka connect事件是Avro编码的。
在传递给Kafka connect standalone服务的connect-standalone.properties中提到了以下内容。
key.converter=io.confluent.connect.avro.AvroConverter
value.confluent=io.confluent.connect.avro.AvroConverter
internal.key.converter=io.confluent.connect.avro.AvroConverter
internal.value.converter=io.confluent.connect.avro.AvroConverter
schema.registry.url=http://ip_address:8081
internal.key.converter.schema.registry.url=http://ip_address:8081
internal.value.converter.schema.registry.url=http://ip_address:8081
使用这些属性配置Kafka使用者代码:
Properties props = new Properties();
props.put("bootstrap.servers", "ip_address:9092");
props.put("zookeeper.connect", "ip_address:2181");
props.put("group.id", "test-consumer-group");
props.put("auto.offset.reset","smallest");
//Setting auto comit to false to ensure that on processing failure we retry the read
props.put("auto.commit.offset", "false");
props.put("key.converter.schema.registry.url", "ip_address:8081");
props.put("value.converter.schema.registry.url", "ip_address:8081");
props.put("schema.registry.url", "ip_address:8081");
在消费者实现中,下面是读取键和值组件的代码。我使用REST从模式注册表中获取键和值的模式。
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
return reader.read(null, DecoderFactory.get().binaryDecoder(byteData, null));
解析密钥工作正常。在解析消息的值部分时,我得到了ArrayIndexOutOfBoundsException。
下载了Avro的源代码并进行了调试。发现GenericDatumReader。readInt方法正在返回负值。该值应为数组(符号)的索引,因此应为正值。
尝试使用 kafka-avro-独立-使用者使用事件,但它也抛出了一个数组索引。因此,我的猜测是消息在Kafka连接(生产者)上编码不正确
以下是问题:
有很多帖子中,Avro反序列化抛出了ArrayIndexOutOfBoundsException,但无法将其与我面临的问题联系起来。
按照http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html的步骤
因为我是新的Kafka,所以我能够从文件中读取记录,并通过生产者将消息发送到Kafka主题,但不能通过消费者消费相同的主题。 注意:您可以从任何文本文件中读取数据,我使用的是Kafka2.11-0.9。0.0版本 这是我的密码: 下面是输出:
我有以下用例: 我有两个Kafka主题,一个是用来处理传入消息流的,另一个是用来存储记录的,作为应用程序初始状态的引导。 有没有办法做到以下几点: 当应用程序启动时,读取Kafka主题中的所有消息,并将该主题中用于将应用程序引导至初始状态的所有存储在内存中 只有在读取了所有消息后,才允许处理流主题中的 因为在应用程序运行时,状态主题上可能会有其他记录,以便在不必重新启动应用程序的情况下将它们合并到
有没有解决这个问题的方法???我无法读取KAFKA-AVRO架构消息。我正在尝试将消息从logstash发送到KAFKA到hdfs。 以下是技术堆栈: LogStash 2.3-当前生产版本 汇流3.0。 插件:A。Logstash-kafka-Output插件B。logstash-codec-avro。 动物园管理员:3.4.6 Kafka:0.10.0.0 Logstash配置文件如下所示:
我有一个简单的java制作人,如下所示 我正在尝试读取如下数据 但消费者并没有从Kafka那里读到任何信息。如果我在处添加以下内容 然后消费者开始从题目开始阅读。但是每次消费者重新启动时,它都从我不想要的主题开始读取消息。如果我在启动消费程序时添加了以下配置 然后,它从主题中读取消息,但是如果消费者在处理所有消息之前重新启动,那么它不会读取未处理的消息。 有人可以让我知道出了什么问题,我该如何解决
我有一个Kafka集群(版本:0.10.1.0),有9个代理和10个分区。 我尝试使用camel kafka从java应用程序中获取消息。这是我的pom。xml 这只是我使用的与骆驼Kafka相关的依赖项。下面是骆驼Kafka消费者代码。 我正在使用文档中指定的KafkaURIhttps://camel.apache.org/components/latest/kafka-component.ht
我尝试收听主题,以查看哪个使用者保存了什么值的offsets,但这并不奏效... 我尝试了以下操作: 为控制台使用者创建了配置文件,如下所示: 谢谢! 码头