我在AWS上建立了一个汇流平台。我的源是MySql,我已经使用debezium连接器将它连接到Kafka connect。源的数据格式是JSON。现在在KSQL中,我创建了一个派生主题,并将JSON主题转换为AVRO以使数据能够使用JDBC连接器下沉到MYSQL。我使用了以下查询:
CREATE STREAM json_stream (userId int, auth_id varchar, email varchar) WITH (KAFKA_TOPIC='test', VALUE_FORMAT='JSON');
派生主题:
create TABLE avro_stream WITH (VALUE_FORMAT='AVRO') AS select * from json_stream;
我曾尝试使用JSON消息直接下沉到mysql,但失败了,因为连接器需要模式,所以带模式的JSON或Avro消息都可以帮助我下沉数据。
[2019-07-09 13:27:30,239] WARN task [0_3] Skipping record due to
deserialization error. topic=[avro_stream] partition=[3] offset=[144]
(org.apache.kafka.streams.processor.internals.RecordDeserializer:86)
org.apache.kafka.connect.errors.DataException: avro_stream at
io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:97)
at
io.confluent.ksql.serde.connect.KsqlConnectDeserializer.deserialize(KsqlConnectDeserializer.java:44)
at
io.confluent.ksql.serde.connect.KsqlConnectDeserializer.deserialize(KsqlConnectDeserializer.java:26)
at
org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65)
at
org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55)
at
org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:63)
at
org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
at
org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:97)
at
org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at
org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:638)
at
org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:936)
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:831)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
Caused by: org.apache.kafka.common.errors.SerializationException:
Error deserializing Avro message for id -1 Caused by:
org.apache.kafka.common.errors.SerializationException: Unknown magic
byte!
{
"name": "debezium-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.user": "XXXXX",
"auto.create.topics.enable": "true",
"database.server.id": "1",
"tasks.max": "1",
"database.history.kafka.bootstrap.servers": "X.X.X.X:9092",,
"database.history.kafka.topic": "XXXXXXX",
"transforms": "unwrap",
"database.server.name": "XX-server",
"database.port": "3306",
"include.schema.changes": "true",
"table.whitelist": "XXXX.XXXX",
"key.converter.schemas.enable": "false",
"value.converter.schema.registry.url": "http://localhost:8081",
"database.hostname": "X.X.X.X",
"database.password": "xxxxxxx",
"value.converter.schemas.enable": "false",
"name": "debezium-connector",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"database.whitelist": "XXXXX",
"key.converter": "org.apache.kafka.connect.json.JsonConverter"
},
"tasks": [
{
"connector": "debezium-connector",
"task": 0
}
],
"type": "source"
KSQL以string
的形式编写键,因此当您使用Avro进行值序列化时,键不是。因此,您的接收器工作器需要这样配置:
"key.converter": "org.apache.kafka.connect.storage.StringConverter"
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "<url to schema registry>",
如果您已经将worker本身配置为使用Avro,那么您可以只覆盖连接器配置的key.converter
。
这是包含列表的POJO JSON具有以下结构: 运行时,web服务的结果运行良好,但反序列化会打印此错误:
问题内容: 我试图反序列化以DateTime作为修饰符的类: 但是,当我尝试tro反序列化时,却遇到以下异常: 我用它来反序列化: 还有我的jsonData的示例: 问题答案: 期望使用无参数构造函数。的最新版本没有这样的构造函数。 如果您已固定格式,即。应该只是一个时间戳,那么你可以简单地注册与。它将在内部用于字段。您可以摆脱注释。 您需要添加库。
我正在尝试使用网络库kryonet创建一个多人游戏,我得到了连接和发送字符串的工作,但现在我正在尝试发送对象。我想做的是发送一个数组列表,但它给了我这个错误。我也尝试只发送一个对象,它给了我同样的错误。 我尝试在Metor类中创建一个没有参数的构造函数,但这也不起作用 编辑:所以我得出的结论是,Kryonet在序列化slick2d图像类时有问题,或者与Kryonet和slick2d有不同的冲突。
我有以下课 以及以下测试: 我收到以下错误: com.fasterxml.jackson.databind.exc.MismatchedInputException:无法构造com.store.domain.model.Cart实例(尽管至少存在一个Creator):无法从[Source:(String)"{"id":"56c7b5f7-115b-4cb9-9658-acb7b849d5d5"}"
我尝试调用一个url谁接受列表。 发送的数据为 “{”时间戳“:1445958336633,”状态“:400,”错误“:”错误请求“,”异常“:”org.springframework.http.converter.httpmessagenotreadableException“,”消息“:”无法读取文档:无法反序列化START_OBJECT令牌之外的java.util.arraylist实例\n
我有2个模式: 模式1(旧模式): 我用一个布尔字段更新了模式: 方案2(新方案): kafka主题包含属于旧模式(schema1)的消息。更新使用者模式后,即使更新字段中存在默认值,使用者也无法反序列化旧模式消息。 根据Avro文档: 阿夫罗博士 我得到以下错误,而反序列化: 当记录缺少字段时,为什么默认值未应用于使用者?任何帮助都非常感谢。提前致谢!