我对Kafka和Kafka Connect世界很陌生。我正在尝试使用Kafka(在MSK上)、Kafka Connect(使用PostgreSQL的Debezium连接器)和RDS Postgres实例来实现CDC。Kafka Connect在我们部署在AWS中的集群中的K8 pod中运行。
在深入研究所使用的配置的细节之前,我将尝试总结问题:
我的连接器配置如下所示:
{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.user": "root",
"database.dbname": "insights",
"slot.name": "cdc_organization",
"tasks.max": "1",
"column.blacklist": "password, access_key, reset_token",
"database.server.name": "insights",
"database.port": "5432",
"plugin.name": "wal2json_rds_streaming",
"schema.whitelist": "public",
"table.whitelist": "public.kafka_connect_cdc_test",
"key.converter.schemas.enable": "false",
"database.hostname": "de-test-sre-12373.cbplqnioxomr.eu-west-1.rds.amazonaws.com",
"database.password": "MYSECRETPWD",
"value.converter.schemas.enable": "false",
"name": "source-postgres",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.html" target="_blank">apache.kafka.connect.json.JsonConverter",
"snapshot.mode": "initial"
}
我们尝试了plugin.name
属性的不同配置:wal2josn
、wal2json_streaming
和wal2json_rds_streaming
。
连接器和数据库之间的连接没有问题,因为我们已经看到,只要连接器启动,消息就会流过。
上述连接器是否存在配置问题,使我们无法看到与主题中出现的新更改相关的消息?
谢啦
您的连接器配置看起来有点混乱。我对Kafka也很陌生,所以我真的不知道这个问题,但这是我的连接器配置,适合我。
{
"name":"<connector_name>",
"config": {
"connector.class":"io.debezium.connector.postgresql.PostgresConnector",
"database.server.name":"<server>",
"database.port":"5432",
"database.hostname":"<host>",
"database.user":"<user>",
"database.dbname":"<password>",
"tasks.max":"1",
"database.history.kafka.boostrap.servers":"localhost:9092",
"database.history.kafka.topic":"<kafka_topic_name>",
"plugin.name":"pgoutput",
"include.schema.changes":"true"
}
}
如果此配置不能正常工作,请尝试查找日志控制台;有时错误不是控制台的最后一次写入
我正在尝试使用Debezium将Amazon RDS中托管的Postgres SQL db与Kafka主题连接起来。 我正在遵循以下教程: 我的kafka和kafka connect服务启动良好,kafka connect服务还在/usr/share/java dir中接收我的debezium postgres连接器jar。 但是,在尝试通过kafka connect API使用以下curl命令附
问题内容: 我正在使用Maven 我添加了以下依赖项 我还在代码中添加了jar 它完全可以正常工作,没有任何错误,在通过spark-submit提交时出现以下错误,非常感谢您的帮助。谢谢你的时间。 线程“主要” java.lang.NoClassDefFoundError中的异常:sun.reflect处的KafkaSparkStreaming.sparkStreamingTest(KafkaSp
它没有任何错误,我得到以下错误时,我运行火花提交,任何帮助都非常感谢。谢谢你抽出时间。 线程“main”java.lang.noClassDeffounderror:org/apache/spark/streaming/kafka/kafkautils在kafkasparkstreaming.sparkstreamingtest(kafkasparkstreaming.java:40)在kafka
我有一个AWS RDS上的Postgres Db和一个Kafka连接连接器(Debezium Postgres)在桌上监听。连接器的配置: 该表不像其他表那样频繁更新,这最初导致了复制延迟,如下所示: 它会变得如此之大,以至于有可能耗尽所有磁盘空间。 我添加了一个心跳,如果我登录到kafka代理并设置这样的控制台消费者:它将转储所有心跳消息,然后每1000毫秒显示一条新消息。 然而,插槽的大小仍在
上下文 我编写了几个小的Kafka Connect连接器。一个每秒只生成随机数据,另一个将其记录在控制台中。它们与模式注册表集成,因此数据使用Avro序列化。 我使用Landoop提供的fast data dev Docker映像将它们部署到本地Kafka环境中 基本设置工作,并每秒生成一条记录的消息 但是,我想更改主题名称策略。默认设置生成两个主题: