使用最新的 kafka 和 confluent jdbc 接收器连接器。发送一个非常简单的 Json 消息:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": true,
"field": "msg"
}
],
"optional": false,
"name": "msgschema"
},
"payload": {
"id": 222,
"msg": "hi"
}
}
但是出现错误:
org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
Jsonlint说Json是有效的。我在 kafka 配置中保留了 json schemas.enable=true
。有什么指示吗?
我最近遇到了同样的问题,我花了多次重试才弄清楚遗漏了什么:
以下设置对我有用:
key.converter.schemas.enable=false
value.converter.schemas.enable=true
此外,请确保该表以前存在于数据库中,并且连接器不应尝试创建表。设置auto.create=false
为了使用JDBC接收器,您的流消息必须有一个模式。这可以通过使用Avro和Schema Registry来实现,也可以通过使用JSON和Schema来实现。如果在最初运行源属性文件后配置了< code>schemas.enable=true,您可能需要删除主题,重新运行接收器,然后再次启动源端。
例:
接收器属性
文件
name=sink-mysql
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=test-mysql-jdbc-foobar
connection.url=jdbc:mysql://127.0.0.1:3306/demo?user=user1&password=user1pass
auto.create=true
以及示例工作配置文件< code > connect-avro-standalone . properties :
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# Local storage file for offset data
offset.storage.file.filename=/tmp/connect.offsets
plugin.path=share/java
并执行
./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-jdbc/sink.properties
您需要告诉 Connect 您的架构已嵌入到您正在使用的 JSON 中。
您有:
value.converter=org.apache.kafka.connect.json.JsonConverter
但还需要:
value.converter.schemas.enable=true
这些是我的道路: 动态架构操作输出示例: 完整示例:https://github . com/Microsoft/PowerPlatformConnectors/blob/070009 ba 5681 FD 57 ee 6 cc 61d 2 a 380123712 a 088 f/certified-connectors/ticketing . events/API definition . sw
问题内容: 好的,我已经尝试了Stack上的所有解决方案,但没有任何效果。我当前的方法从MainActivity注册了“ SmsListener”接收器。我要做的就是初始化onReceive方法。没有错误;它根本不是在广播。我究竟做错了什么?在此处粘贴适用的代码。可能需要的其他任何东西都可以问。 更新:这是一个未解决的类似问题,当 我在Android6.0.1下测试我正在测试的GoogleHang
我尝试使用以下配置启动JDBC接收器连接器: 但当连接器处于运行状态时,没有任务正在运行: 我多次面对这个问题,但我很困惑,因为它是随机发生的。我的问题与这个问题非常相似。如果有任何帮助,我将不胜感激! 更新。11/04/2019(不幸的是,现在我只有INFO级别日志) 最后,经过几次尝试,我通过更新现有连接器的配置crm_data-sink_db_hh启动了正在运行任务的连接器: 日志: 更新。
用例是将整个消息(JSON)和键作为记录存储在表中,表中有两列“id”和“data”。 数据库是Postgres,它支持JSON的列类型。 根据本文,JSONCon的支持类型是字符串、int64等https://cwiki.apache.org/confluence/display/KAFKA/KIP-301:JsonConver的模式推理 是否可以将数据字段的类型设置为JSON,然后将其存储在P
我正在尝试将来自主题的数据(json数据)写入MySql数据库。我想我需要一个JDBC接收器连接器。 我如何配置连接器以将主题中的json数据映射到如何将数据插入数据库。 我能找到的文件只有这个。 “接收器连接器需要了解架构,因此您应该使用合适的转换器,例如架构注册表附带的Avro转换器,或启用了架构的JSON转换器。如果存在Kafka记录键,则可以是基元类型或连接结构,记录值必须是连接结构。从连
我开始对kafka绝望了。对于一个私人项目,一家公司向我发送了一个kafka流。经过长时间的尝试,我终于设法连接到引导服务器并接收到第一条消息。但没有反序列化。目前数据格式如下:4868fa8 该公司以avro格式发送密钥和值,我也获得了几个模式URL。但是我不能正确地使用它们,这样我就能得到可读的数据。不管我怎么输入,它总是出错。只有当我在没有任何模式的情况下进行检索时,我才能得到如上所述的消息