我用Flink的table API创建了一个表。
CREATE TABLE recommendations (
...
) WITH (
'connector' = 'kafka',
'topic' = 'my_kafka_topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.kerberos.service.name' = 'kafka',
'scan.startup.mode' = 'latest-offset',
'value.format' = 'avro-confluent',
'value.avro-confluent.url' = 'http://schema-registry-address',
'value.fields-include' = 'EXCEPT_KEY'
);
当运行SQL以查看记录时,我得到:
Flink SQL> select * from default_catalog.default_database.recommendations ;
[ERROR] Could not execute SQL statement. Reason:
java.lang.ArrayIndexOutOfBoundsException: -25
Flink SQL> select * from default_catalog.default_database.recommendations ;
[ERROR] Could not execute SQL statement. Reason:
java.io.IOException: Failed to deserialize Avro record.
我知道有一些坏的avro记录被推送到Kafka主题中。在JSON格式中,有一个选项可以通过设置'json.ignore-解析-错误'='true'
来跳过/过滤这些记录。当从合流avro格式读取时,我们可以跳过这些记录吗?
这并不理想,但不幸的是,尽管有一个模式注册表,但我无法控制要推送到Kafka的内容。
AVRO目前没有这样的选择。这里有一张公开票https://issues.apache.org/jira/browse/FLINK-20091
我有一个问题,我的记录json可以为null。如何处理avro模式中的空记录?给出的文档是针对我想要为null记录获取的null属性的。
我有以下:Source-Kafka topic(trans)Channel-memory Sink-Hdfs(avro _ event) kafka主题trans中的数据是使用c#生产者编写的,并且有数千条avro记录。当我运行我的水槽消费者时,它开始将数据下沉到hdfs。问题是数据的格式是:模式数据模式数据 而不是: 模式数据数据 我猜这是因为flume需要一个带有{header} {body}
我试图使用Confluent Kafka REST Proxy从我的一个主题中检索Avro格式的数据,但不幸的是,我得到了一个反序列化错误。我使用以下命令查询Kafka REST代理 我得到的回应是 Kafka Rest Proxy服务器上的日志如下: 数据是使用KafkaAvroSerializer生成的,模式在模式注册表中。还请注意,在CLI上使用avro console consumer可以
我尝试使用Kafka流将一个带有String/JSON消息的主题转换为另一个作为Avro消息的主题。 并得到如下所示的异常: 这是正确的做法吗?我对Kafka溪流和阿夫罗是新来的
我有以下场景: 生产者通过Confluent的REST代理(在Confluent的模式注册表上注册模式)向Kafka主题发送Avro编码的消息,如http://docs.confluent.io/3.0.0/kafka-rest/docs/intro.html#produce-and-consument-avro-messages所述 Spring Cloud Stream enabled mes
我有一个环境,我使用一个Kafka Connect Worker,它使用Oracle数据库中的一些数据,然后将其推送到Avro格式的Kafka主题中。 现在,我需要创建一个Kafka连接接收器来使用这个AVRO消息,将其转换为Json,然后将其写入Redis数据库。 到目前为止,我只能在Redis上写我在topic中使用的同样的AVRO消息。我曾尝试使用转换器,但可能误解了其用法。 吼我的工人和水