当前位置: 首页 > 知识库问答 >
问题:

Kafka连接:使用debezium从Postgres到主题的流媒体变化

武友樵
2023-03-14

我对Kafka和Kafka Connect世界很陌生。我正在尝试使用Kafka(在MSK上)、Kafka Connect(使用PostgreSQL的Debezium连接器)和RDS Postgres实例来实现CDC。Kafka Connect在我们部署在AWS中的集群中的K8 pod中运行。

在深入研究所使用的配置的细节之前,我将尝试总结问题:

  • 连接器启动后,它会按预期向主题发送消息(snahpshot)
  • 一旦我们对表进行任何更改(创建、更新、删除),就不会向主题发送任何消息。我们希望看到有关对表所做更改的消息。

我的连接器配置如下所示:

{
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.user": "root",
    "database.dbname": "insights",
    "slot.name": "cdc_organization",
    "tasks.max": "1",
    "column.blacklist": "password, access_key, reset_token",
    "database.server.name": "insights",
    "database.port": "5432",
    "plugin.name": "wal2json_rds_streaming",
    "schema.whitelist": "public",
    "table.whitelist": "public.kafka_connect_cdc_test",
    "key.converter.schemas.enable": "false",
    "database.hostname": "de-test-sre-12373.cbplqnioxomr.eu-west-1.rds.amazonaws.com",
    "database.password": "MYSECRETPWD",
    "value.converter.schemas.enable": "false",
    "name": "source-postgres",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter": "org.html" target="_blank">apache.kafka.connect.json.JsonConverter",
    "snapshot.mode": "initial"
}

我们尝试了plugin.name属性的不同配置:wal2josnwal2json_streamingwal2json_rds_streaming

连接器和数据库之间的连接没有问题,因为我们已经看到,只要连接器启动,消息就会流过。

上述连接器是否存在配置问题,使我们无法看到与主题中出现的新更改相关的消息?

谢啦

共有1个答案

秦诚
2023-03-14

您的连接器配置看起来有点混乱。我对Kafka也很陌生,所以我真的不知道这个问题,但这是我的连接器配置,适合我。

{
   "name":"<connector_name>",
   "config": {
      "connector.class":"io.debezium.connector.postgresql.PostgresConnector",
      "database.server.name":"<server>",
      "database.port":"5432",
      "database.hostname":"<host>",
      "database.user":"<user>",
      "database.dbname":"<password>",
      "tasks.max":"1",
      "database.history.kafka.boostrap.servers":"localhost:9092",
      "database.history.kafka.topic":"<kafka_topic_name>",
      "plugin.name":"pgoutput",
      "include.schema.changes":"true"
   }
}

如果此配置不能正常工作,请尝试查找日志控制台;有时错误不是控制台的最后一次写入

 类似资料:
  • 我正在尝试使用Debezium将Amazon RDS中托管的Postgres SQL db与Kafka主题连接起来。 我正在遵循以下教程: 我的kafka和kafka connect服务启动良好,kafka connect服务还在/usr/share/java dir中接收我的debezium postgres连接器jar。 但是,在尝试通过kafka connect API使用以下curl命令附

  • 问题内容: 我正在使用Maven 我添加了以下依赖项 我还在代码中添加了jar 它完全可以正常工作,没有任何错误,在通过spark-submit提交时出现以下错误,非常感谢您的帮助。谢谢你的时间。 线程“主要” java.lang.NoClassDefFoundError中的异常:sun.reflect处的KafkaSparkStreaming.sparkStreamingTest(KafkaSp

  • 它没有任何错误,我得到以下错误时,我运行火花提交,任何帮助都非常感谢。谢谢你抽出时间。 线程“main”java.lang.noClassDeffounderror:org/apache/spark/streaming/kafka/kafkautils在kafkasparkstreaming.sparkstreamingtest(kafkasparkstreaming.java:40)在kafka

  • 我有一个AWS RDS上的Postgres Db和一个Kafka连接连接器(Debezium Postgres)在桌上监听。连接器的配置: 该表不像其他表那样频繁更新,这最初导致了复制延迟,如下所示: 它会变得如此之大,以至于有可能耗尽所有磁盘空间。 我添加了一个心跳,如果我登录到kafka代理并设置这样的控制台消费者:它将转储所有心跳消息,然后每1000毫秒显示一条新消息。 然而,插槽的大小仍在

  • 上下文 我编写了几个小的Kafka Connect连接器。一个每秒只生成随机数据,另一个将其记录在控制台中。它们与模式注册表集成,因此数据使用Avro序列化。 我使用Landoop提供的fast data dev Docker映像将它们部署到本地Kafka环境中 基本设置工作,并每秒生成一条记录的消息 但是,我想更改主题名称策略。默认设置生成两个主题: