我试图在WSO2流集成器和本地Postgres DB之间设置更改数据捕获(CDC)。
我遵循CDCWithListeningMode示例来实现CDC,并使用pgoutput作为逻辑解码插件。但是当我运行应用程序时,我会得到以下日志。
[2020-04-23_19-02-37_460] INFO {org.apache.kafka.connect.json.JsonConverterConfig} - JsonConverterConfig values:
converter.type = key
schemas.cache.size = 1000
schemas.enable = true
[2020-04-23_19-02-37_461] INFO {org.apache.kafka.connect.json.JsonConverterConfig} - JsonConverterConfig values:
converter.type = value
schemas.cache.size = 1000
schemas.enable = false
[2020-04-23_19-02-37_461] INFO {io.debezium.embedded.EmbeddedEngine$EmbeddedConfig} - EmbeddedConfig values:
access.control.allow.methods =
access.control.allow.origin =
bootstrap.servers = [localhost:9092]
header.converter = class org.apache.kafka.connect.storage.SimpleHeaderConverter
internal.key.converter = class org.apache.kafka.connect.json.JsonConverter
internal.value.converter = class org.apache.kafka.connect.json.JsonConverter
key.converter = class org.apache.kafka.connect.json.JsonConverter
listeners = null
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
offset.flush.interval.ms = 60000
offset.flush.timeout.ms = 5000
offset.storage.file.filename =
offset.storage.partitions = null
offset.storage.replication.factor = null
offset.storage.topic =
plugin.path = null
rest.advertised.host.name = null
rest.advertised.listener = null
rest.advertised.port = null
rest.host.name = null
rest.port = 8083
ssl.client.auth = none
task.shutdown.graceful.timeout.ms = 5000
value.converter = class org.apache.kafka.connect.json.JsonConverter
[2020-04-23_19-02-37_516] INFO {io.debezium.connector.common.BaseSourceTask} - offset.storage = io.siddhi.extension.io.cdc.source.listening.InMemoryOffsetBackingStore
[2020-04-23_19-02-37_517] INFO {io.debezium.connector.common.BaseSourceTask} - database.server.name = localhost_5432
[2020-04-23_19-02-37_517] INFO {io.debezium.connector.common.BaseSourceTask} - database.port = 5432
[2020-04-23_19-02-37_517] INFO {io.debezium.connector.common.BaseSourceTask} - table.whitelist = SweetProductionTable
[2020-04-23_19-02-37_517] INFO {io.debezium.connector.common.BaseSourceTask} - cdc.source.object = 1716717434
[2020-04-23_19-02-37_517] INFO {io.debezium.connector.common.BaseSourceTask} - database.hostname = localhost
[2020-04-23_19-02-37_518] INFO {io.debezium.connector.common.BaseSourceTask} - database.password = ********
[2020-04-23_19-02-37_518] INFO {io.debezium.connector.common.BaseSourceTask} - name = CDCWithListeningModeinsertSweetProductionStream
[2020-04-23_19-02-37_518] INFO {io.debezium.connector.common.BaseSourceTask} - server.id = 6140
[2020-04-23_19-02-37_519] INFO {io.debezium.connector.common.BaseSourceTask} - database.history = io.debezium.relational.history.FileDatabaseHistory
[2020-04-23_19-02-38_103] INFO {io.debezium.connector.postgresql.PostgresConnectorTask} - user 'user_name' connected to database 'db_name' on PostgreSQL 11.5, compiled by Visual C++ build 1914, 64-bit with roles:
role 'user_name' [superuser: false, replication: true, inherit: true, create role: false, create db: false, can log in: true] (Encoded)
[2020-04-23_19-02-38_104] INFO {io.debezium.connector.postgresql.PostgresConnectorTask} - No previous offset found
[2020-04-23_19-02-38_104] INFO {io.debezium.connector.postgresql.PostgresConnectorTask} - Taking a new snapshot of the DB and streaming logical changes once the snapshot is finished...
[2020-04-23_19-02-38_105] INFO {io.debezium.util.Threads} - Requested thread factory for connector PostgresConnector, id = localhost_5432 named = records-snapshot-producer
[2020-04-23_19-02-38_105] INFO {io.debezium.util.Threads} - Requested thread factory for connector PostgresConnector, id = localhost_5432 named = records-stream-producer
[2020-04-23_19-02-38_293] INFO {io.debezium.connector.postgresql.connection.PostgresConnection} - Obtained valid replication slot ReplicationSlot [active=false, latestFlushedLSN=null]
[2020-04-23_19-02-38_704] ERROR {io.siddhi.core.stream.input.source.Source} - Error on 'CDCWithListeningMode'. Connection to the database lost. Error while connecting at Source 'cdc' at 'insertSweetProductionStream'. Will retry in '5 sec'. (Encoded)
io.siddhi.core.exception.ConnectionUnavailableException: Connection to the database lost.
at io.siddhi.extension.io.cdc.source.CDCSource.lambda$connect$1(CDCSource.java:424)
at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:793)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.ConnectException: Cannot create replication connection
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.(PostgresReplicationConnection.java:87)
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.(PostgresReplicationConnection.java:38)
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$ReplicationConnectionBuilder.build(PostgresReplicationConnection.java:362)
at io.debezium.connector.postgresql.PostgresTaskContext.createReplicationConnection(PostgresTaskContext.java:65)
at io.debezium.connector.postgresql.RecordsStreamProducer.(RecordsStreamProducer.java:81)
at io.debezium.connector.postgresql.RecordsSnapshotProducer.(RecordsSnapshotProducer.java:70)
at io.debezium.connector.postgresql.PostgresConnectorTask.createSnapshotProducer(PostgresConnectorTask.java:133)
at io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:86)
at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:45)
at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:677)
... 3 more
Caused by: io.debezium.jdbc.JdbcConnectionException: ERROR: could not access file "decoderbufs": No such file or directory
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.initReplicationSlot(PostgresReplicationConnection.java:145)
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.(PostgresReplicationConnection.java:79)
... 12 more
Caused by: org.postgresql.util.PSQLException: ERROR: could not access file "decoderbufs": No such file or directory
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2440)
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2183)
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:308)
at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:441)
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:365)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:307)
at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:293)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:270)
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:266)
at org.postgresql.replication.fluent.logical.LogicalCreateSlotBuilder.make(LogicalCreateSlotBuilder.java:48)
at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.initReplicationSlot(PostgresReplicationConnection.java:108)
... 13 more
Debezium默认为decoderbufs插件-“无法访问文件”decoderbufs“:没有这样的文件或目录”。
根据这个答案,问题是由于decoderbufs插件的配置。
请帮我解决这个问题。我没有找到任何可以帮助我的资源。
您要么需要将Debezium更新到最新的1.1版本--这将使您能够使用plugin.name
config选项使用pgoutput
插件,要么需要将decoderbufs.so
库部署到PostgreSQL数据库中。
我推荐前者,因为0.8.3是非常旧的版本。
我正在尝试使用spring security和wso2 identity server 5.1.0集成(SSO)多个服务提供商。根据博客,我已经将spring security SAML示例与wso2集成在一起,仅针对一个服务提供商,并且运行良好,但我无法为多个服务提供商执行SSO。我什么都查过了,但运气不好。 请在下面找到我认为应该是什么,但我不知道如何实现这一点。 WSO2是:创建了一个具有唯
我已经看到了在中间访问通量的问题,我想知道为什么我用以下方式成功地在通量中编写逻辑: 首先,我想知道为什么我从未将错误抛出控制台,但在调试时我看到了错误。我还想知道这是如何工作的,为什么我需要变量(它总是产生,即使流可以继续并正常工作)。当我省略
我正在研究一个Flink流式处理器,它可以从Kafka读取事件。这些事件由其中一个字段键控,并且在减少和输出之前应该在一段时间内加窗。我的处理器使用事件时间作为时间特性,因此从它所消耗的事件中读取时间戳。以下是它目前的样子: 我所知道的事件如下: null null
我们正在尝试整合吉拉和詹金斯。我们使用Zephyr进行测试管理,对于Jenkins集成,我们已经安装了ZAPI。在Jenkins安装了Zephyr插头。到目前为止,Jenkins可以与Jira连接,并向我们展示项目。我们正在遵循这里描述的步骤,但是这个指导方针告诉我们要有一个Subversion存储库。目前我们的项目是驻留在Git中,我们希望保持它的这种方式。 为了测试的目的,我已经把项目放在一个
我想明白的是。 如何给出多个筛选器表达式,例如,我想从服务器获取.csv和.xml文件。 如何忽略单个文件类型,例如,我想只忽略具有.txt类型的文件,并获取rest文件。
我想在C#中使用SikuliIntegrator。我以管理员的身份运行VS,通过NuGet manager安装SikuliIntegrator,并想在简单的任务上测试他。 这是我的密码 运行代码(并在屏幕上准备好我的图片)后,我得到的只是异常“####失败”知道为什么吗? 我不想使用Sikuli4Net,因为它看起来像是在web ap上工作,我只需要在桌面应用上简单点击几下。 我尝试了Java中的