当前位置: 首页 > 知识库问答 >
问题:

无法使用Kafka将数据从MySQL流式传输到Postgres

商飞尘
2023-03-14

我第一次尝试Kafka,并使用AWS MSK设置Kafka群集。目标是将数据从MySQL服务器流式传输到Postgresql。我使用debezium MySQL连接器作为源,使用Confluent JDBC连接器作为接收器。

MySQL配置:

  "connector.class": "io.debezium.connector.mysql.MySqlConnector",
  "database.server.id": "1",
  "tasks.max": "3",
  "internal.key.converter.schemas.enable": "false",
  "transforms.unwrap.add.source.fields": "ts_ms",
  "key.converter.schemas.enable": "false",
  "internal.key.converter": "org.apache.kafka.connect.json.JsonConverter",
  "internal.value.converter.schemas.enable": "false",
  "value.converter.schemas.enable": "false",
  "internal.value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "key.converter": "org.apache.kafka.connect.json.JsonConverter",
  "transforms": "unwrap",
  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"

注册Mysql连接器后,其状态为“正在运行”,并捕获MySQL表中所做的更改,并以以下格式在消费者控制台中显示结果:

{"id":5,"created_at":1594910329000,"userid":"asldnl3r234mvnkk","amount":"B6Eg","wallet_type":"CDW"}

我的第一个问题:在表中“金额”列是“十进制”类型并包含数值,但在消费者控制台中为什么它显示为字母数字值?

对于作为目标数据库的Postgresql,我使用JDBC接收器连接器,配置如下:

"name": "postgres-connector-db08",
  "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
  "tasks.max": "1",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "key.converter.schemas.enable": "false",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter.schemas.enable": "false",
  "topics": "mysql-cash.kafka_test.test",
  "connection.url": "jdbc:postgresql://xxxxxx:5432/test?currentSchema=public",
  "connection.user": "xxxxxx",
  "connection.password": "xxxxxx",
  "insert.mode": "upsert",
  "auto.create": "true",
  "auto.evolve": "true"

注册JDBC连接器后,当我检查状态时,它会出现错误:

{"name":"postgres-connector-db08","connector":{"state":"RUNNING","worker_id":"x.x.x.x:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"x.x.x.x:8083","trace":"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
 org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:561)
 org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
 org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
 org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
 org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
 org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 java.util.concurrent.FutureTask.run(FutureTask.java:266)
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.ConnectException: Sink connector 'postgres-connector-db08' is configured with 'delete.enabled=false' and 'pk.mode=none' and therefore requires records with a non-null Struct value and non-null Struct schema, but found record at (topic='mysql-cash.kafka_test.test',partition=0,offset=0,timestamp=1594909233389) with a HashMap value and null value schema.
 io.confluent.connect.jdbc.sink.RecordValidator.lambda$requiresValue$2(RecordValidator.java:83)
 io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:82)
 io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66)
 io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
 org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
... 10 more
"}],"type":"sink"}

为什么会出现此错误?我在水槽配置中遗漏了什么吗?

共有1个答案

贾烨
2023-03-14

https://docs.confluent.io/kafka-connect-jdbc/current/sink-connector/index.html#data-mapping

The sink connector requires knowledge of schemas, so you should use a suitable converter e.g. the Avro converter that comes with Schema Registry, or the JSON converter with schemas enabled.

由于JSON是普通的(没有架构),并且连接器配置了value.converter.schemas.enable“:“false”(禁用架构的JSON转换器),因此应使用架构注册表设置Avro转换器:https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/#applying-架构

 类似资料:
  • 问题内容: 我想使用elasticsearch-river-mysql以便将数据从MySQL数据库连续传输到ElasticSearch。我是ES和Rivers的初学者,所以希望您能为我的问题提供帮助。 据我所知,数据将从MySQL数据库流式传输到ES集群,后者将自动对其进行索引。那是对的吗?我需要了解任何超时或限制吗? 关系数据库表之间的外键关系将如何转换为ES?包含外键的表行是否将成为ES文档的

  • 我按照Kantega/storm-twitter-workshop项目链接的步骤进行操作,但我在某一点上卡住了。在测试凭证时,通过运行文件夹中的主类作弊 cd作弊< br> mvn编译exec:Java-dexec . classpathscope = compile-dexec . main class = storm . starter . cheatingtwitterfuntopology

  • 我正在尝试使用apache flume将数据加载到hbase中。当我使用flume将数据传输到hadoop时,它工作得很好。但是当我启动flume代理将数据加载到hbase时,我得到了NoClassDefFoundError。 这是我的水槽配置: flume-env.sh 代理4.conf

  • 问题内容: 我正在尝试使用流使用Hapi将数据发送到浏览器,但无法确定我们的方式。具体来说,我正在使用请求模块。根据文档,该对象接受流,所以我尝试了: 引发错误。在文档中说流对象必须与stream2兼容,所以我尝试了: 现在,这不会引发服务器端错误,但是在浏览器中,请求永远不会加载(使用chrome)。 然后我尝试了这个: 并且在控制台 中 输出了数据,所以我知道流不是问题,而是Hapi。我如何在

  • 我想将数据从高音扬声器流式传输到hdfs,我使用了以下命令:./bin/Flume-ng代理-n TwitterAgent-c conf-f /usr/lib/apache-flume-1.4.0-bin/conf/flume.conf 我不能得到我期望的结果。没有数据流。 谁有linus命令可以将数据从tweeter传输到hdfs?

  • 大家好,我正在使用debezium捕获Mongo中的更改,并将它们推送到mysql中。我正在使用以下链接https://github.com/debezium/debezium-examples/tree/master/unwrap-mongodb-smt我正在把postgres数据库替换为mysql数据库,但我无法这样做。 这是我修改后的jdbc-sink.json,我使用mysql url连接