当前位置: 首页 > 知识库问答 >
问题:

Kafka JDBC源连接器:从列值创建主题

尉迟晔
2023-03-14

我有一个微服务,它使用 OracleDB 在EVENT_STORE表中发布系统更改。表EVENT_STORE包含一个列 TYPE,其中包含事件类型的名称。

JDBC Source Kafka Connect可能会接受EVENT_STORE表更改,并在KAFKA-TOPIC中使用列TYPE的值发布它们?

这是我的源代码kafka连接器配置:

{
  "name": "kafka-connector-source-ms-name",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "tasks.max": "1",
    "connection.url": "jdbc:oracle:thin:@localhost:1521:xe",
    "connection.user": "squeme-name",
    "connection.password": "password",
    "topic.prefix": "",
    "table.whitelist": "EVENT_STORE",
    "mode": "timestamp+incrementing",
    "timestamp.column.name": "CREATE_AT",
    "incrementing.column.name": "ID",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "config.action.reload": "restart",
    "errors.retry.timeout": "0",
    "errors.retry.delay.max.ms": "60000",
    "errors.tolerance": "none",
    "errors.log.enable": "false",
    "errors.log.include.messages": "false",
    "connection.attempts": "3",
    "connection.backoff.ms": "10000",
    "numeric.precision.mapping": "false",
    "validate.non.null": "true",
    "quote.sql.identifiers": "ALWAYS",
    "table.types": "TABLE",
    "poll.interval.ms": "5000",
    "batch.max.rows": "100",
    "table.poll.interval.ms": "60000",
    "timestamp.delay.interval.ms": "0",
    "db.timezone": "UTC"
  }
}

共有1个答案

郝原
2023-03-14

您可以尝试提取主题转换以从字段中提取主题名称

将以下属性添加到JSON中

transforms=ValueFieldExample
transforms.ValueFieldExample.type=io.confluent.connect.transforms.ExtractTopic$Value
transforms.ValueFieldExample.field=TYPE
 类似资料:
  • Q1)以下是我在为MySQL源创建kafka连接器时使用的配置。 为什么会创建cdc.fkw.supply.mp和_ _ debezium-heart beat . CDC . fkw . supply . MP主题? 我在这两个主题中看到了一些垃圾数据。 Q2) 有没有Restapi知道工作服务器上的kafka连接转换器配置?如果没有API,那么我们存储所有工作属性的配置文件的路径是什么? 这是

  • 我尝试使用以下配置启动JDBC接收器连接器: 但当连接器处于运行状态时,没有任务正在运行: 我多次面对这个问题,但我很困惑,因为它是随机发生的。我的问题与这个问题非常相似。如果有任何帮助,我将不胜感激! 更新。11/04/2019(不幸的是,现在我只有INFO级别日志) 最后,经过几次尝试,我通过更新现有连接器的配置crm_data-sink_db_hh启动了正在运行任务的连接器: 日志: 更新。

  • 问题内容: 我正在运行SQL Server,并且有一个存储过程。我想用WHERE IN子句做一条选择语句。我不知道列表会持续多久,所以我尝试了以下方法 在此解决方案中,@ idList是varChar(max)。但这是行不通的。我听说过要传递表值,但是我对如何做到这一点感到困惑。任何帮助都会很棒 问题答案: 我建议使用一个函数来拆分传入列表(使用Martin在其评论中放置的链接)。 将split函

  • 我正在使用现有的Java代码,其中在部署的系统上有一个现有的JDBC连接池机制,并且有一个已经存在的获取JDBC连接的设置。我想利用这一点来创建MyBatis SqlSession对象,而不创建配置、数据源和其他东西 我的代码已经创建了对象,并为其提供了所需的资源。我想利用这一点,获得对象,并从此使用MyBatis。 我不希望MyBatis管理连接池,确定使用哪个数据源等等,这可能吗?

  • 无法使用Ksqldb创建Kafka->Cassandra接收器连接器: 创建接收器连接器cassandra(“CONNECTOR.class”=“io.confluent.connect.cassandra.CassandrasinkConnector”,“tasks.max”=“1”,“topics”=“tst”,“cassandra.contact.points”=“cassandra”,“c

  • 我正在尝试使用快速连接将JProfiler(10.0.1)连接到JMX端口- “另一个应用程序正在端口19002上运行。请检查您的端口配置”。 我使用的是oracle JDK jdk1.8.0_181。下面是我用于服务器启动的JXM选项。