当前位置: 首页 > 知识库问答 >
问题:

如何使用debezium从Postgres流式传输更改

阎英朗
2023-03-14

使用debezium从Postgres流式更改

已完成的设置:

  1. Docker设置。
  2. 启动Postgres、zookeeper、kafka和debezium Connector。
  3. 使用decoderbufs、wal2json(postgres)设置远程数据库。
  4. 使用curl连接到debezium。
  5. 创建了一个观察者。

问题:当我启动watcher时,它读取了之前发生的所有更改,但当任何插入完成时,kafka会向debezium抛出一个异常,表示“在change event Producer.This connector will be stopped.”,并且在watcher中没有显示。

由于我对这些概念很陌生,不知道我在环境设置中错过了什么,这是我在堆栈溢出中的第一个问题,请忽略我的错误。

主要问题是我的本地数据库运行良好。
有没有人能帮上忙?
请提前致谢

019-05-02 14:09:47,242 WARN   Postgres|kafkaserver|records-stream-producer  Closing replication stream due to db connection IO exception...   [io.debezium.connector.postgresql.RecordsStreamProducer]
2019-05-02 14:09:47,365 INFO   ||  WorkerSourceTask{id=kafka-public-connector-0} Committing offsets   [org.apache.kafka.connect.runtime.WorkerSourceTask]
2019-05-02 14:09:47,366 INFO   ||  WorkerSourceTask{id=kafka-public-connector-0} flushing 0 outstanding messages for offset commit   [org.apache.kafka.connect.runtime.WorkerSourceTask]
2019-05-02 14:09:47,375 ERROR  ||  WorkerSourceTask{id=kafka-public-connector-0} Task threw an uncaught and unrecoverable exception   [org.apache.kafka.connect.runtime.WorkerTask]
org.apache.kafka.connect.errors.ConnectException: An exception ocurred in the change event producer. This connector will be stopped.
    at io.debezium.connector.base.ChangeEventQueue.throwProducerFailureIfPresent(ChangeEventQueue.java:170)
    at io.debezium.connector.base.ChangeEventQueue.poll(ChangeEventQueue.java:151)
    at io.debezium.connector.postgresql.PostgresConnectorTask.poll(PostgresConnectorTask.java:161)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:245)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:221)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.postgresql.util.PSQLException: Database connection failed when reading from copy
    at org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(QueryExecutorImpl.java:1037)
    at org.postgresql.core.v3.CopyDualImpl.readFromCopy(CopyDualImpl.java:41)
    at org.postgresql.core.v3.replication.V3PGReplicationStream.receiveNextData(V3PGReplicationStream.java:155)
    at org.postgresql.core.v3.replication.V3PGReplicationStream.readInternal(V3PGReplicationStream.java:124)
    at org.postgresql.core.v3.replication.V3PGReplicationStream.read(V3PGReplicationStream.java:70)
    at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.read(PostgresReplicationConnection.java:251)
    at io.debezium.connector.postgresql.RecordsStreamProducer.streamChanges(RecordsStreamProducer.java:134)
    at io.debezium.connector.postgresql.RecordsStreamProducer.lambda$start$0(RecordsStreamProducer.java:120)
    ... 5 more
Caused by: java.io.EOFException
    at org.postgresql.core.PGStream.receiveChar(PGStream.java:308)
    at org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(QueryExecutorImpl.java:1079)
    at org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(QueryExecutorImpl.java:1035)
    ... 12 more
2019-05-02 14:09:47,387 ERROR  ||  WorkerSourceTask{id=kafka-public-connector-0} Task is being killed and will not recover until manually restarted   [org.apache.kafka.connect.runtime.WorkerTask]
  • 是否有解决此问题的完整指南?
  • 的主要目标是,我有一个数据库中有大量的数据,其中一个应用程序(生产者)从另一个服务器获取数据,并将所有数据保存在我们自己的数据库中,另一个应用程序(消费者)获取存储并应用业务逻辑和前端。在这里,我想用这个debezium和kafka部分替换从另一个应用程序(消费者)到db的点击。
  • 或者有什么方法可以这样做。

共有1个答案

莘欣怿
2023-03-14

感谢大家。解决了上述问题。远程数据库中的安装存在实际问题。很少有其他依赖项没有正确安装,如postgis、protobuf-c、decoderbufs,一旦正确安装,问题就解决了。

 类似资料:
  • 我第一次尝试Kafka,并使用AWS MSK设置Kafka群集。目标是将数据从MySQL服务器流式传输到Postgresql。我使用debezium MySQL连接器作为源,使用Confluent JDBC连接器作为接收器。 MySQL配置: 注册Mysql连接器后,其状态为“正在运行”,并捕获MySQL表中所做的更改,并以以下格式在消费者控制台中显示结果: 我的第一个问题:在表中“金额”列是“十

  • 我对Kafka和Kafka Connect世界很陌生。我正在尝试使用Kafka(在MSK上)、Kafka Connect(使用PostgreSQL的Debezium连接器)和RDS Postgres实例来实现CDC。Kafka Connect在我们部署在AWS中的集群中的K8 pod中运行。 在深入研究所使用的配置的细节之前,我将尝试总结问题: 连接器启动后,它会按预期向主题发送消息(snahps

  • 问题内容: 我需要执行一个没有长度的八位字节流的api。它只是实时数据流。我遇到的问题是,当我发出请求时,似乎试图在将信息读入输入流之前先等待内容的结尾,但是它没有看到内容的结尾和NoHttpResponse异常的超时。以下是我的代码的简化版本: 问题答案: 编辑2 因此,如果您对线程/可运行程序/处理程序不满意,而对Android AsyncTask不满意,我将直接转到HttpUrlConnec

  • 使用 Spark 2 连接器从 CosmosDB 流式传输可以使用 Changefeed 实现。 https://docs.microsoft.com/en-us/azure/cosmos-db/spark-connector#streaming-reads-from-cosmos-db 我们如何在Spark 3中做同样的事情?我正在使用Cosmos DB Apache Spark 3联机事务处理

  • 问题内容: 我有一个200MB的文件,想通过下载提供给用户。但是,由于我们希望用户仅下载一次此文件,因此我们这样做: 强制下载。但是,这意味着整个文件必须加载到内存中,这通常不起作用。我们如何以每块kb的速度将文件流式传输给他们? 问题答案: 尝试这样的事情

  • 我正在尝试从数据库中加载50000个包含文本的项目,标记它们并更新标签 我使用pg-Promise和pg-query-stream来达到这个目的 我能够让流部分正常工作,但是更新变得有问题,有这么多更新语句 这是我现有的代码 db对象是来自pg promise的对象,而tagger只是从变量标记中包含的文本中提取一个标记数组 根据我在诊断中看到的内容,执行了太多更新语句,有没有办法对它们进行批处理