当前位置: 首页 > 知识库问答 >
问题:

基于查询的JDBC源连接器Kafka

彭令秋
2023-03-14
create table test(
id varchar(20) primary key,
name varchar(10) 
);

INSERT INTO test(
    id, name)
VALUES ('1ab', 't'),
('2ab', 't'),
('3ab', 't')

{"name" : "test_connector",
    "config" : {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url": "jdbc:postgresql://12.34.5.6:5432/",
        "connection.user": "user",
        "connection.password": "password",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "topic.prefix": "incre_",
        "mode": "incrementing",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "query" :"SELECT cast(replace(id, 'ab','') as integer) as id , name from test ORDER BY id ASC",
        "incrementing.column.name":"id",
        "value.converter.schema.registry.url": "http://schema-registry_url.com",
        "key.converter.schema.registry.url": "http://schema-registry_url.com",
        "offset.flush.timeout.ms": 2000,

    }
}

在我发布配置之后,当我执行HTTP curl时,状态是running。在worker的日志中也没有错误日志,当我检查它时,在kafka主题中也没有数据。当我尝试做一个控制台-消费者时,我还尝试了其他几种组合,比如在“table.whitelist”中添加:“test”

我尝试的另一件事是使用这两个链接https://rmoff.net/2018/05/21/kafka-connect-and-oracle-data-types/https://www.confluent.io/blog/kafka-connect-deep-dive-jdbc-source-connector,但没有任何帮助,甚至使用了诸如SELECT*from(SELECT id,name from test where...)之类的聪明技巧

共有1个答案

卫阳炎
2023-03-14

所以几个小时后玩不同的配置。我回到官方文件,意识到这一点

使用自定义查询而不是加载表,允许您联接来自多个表的数据。只要查询不包括自己的筛选,您仍然可以对增量查询使用内置模式(在本例中,使用时间戳列)。请注意,这限制了每个连接器只能有一个输出,而且由于没有表名,因此主题“prefix”在本例中实际上是完整的主题名。

所以关键是“topic.prefix”:“incre_test”

{"name" : "test_connector",
    "config" : {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url": "jdbc:postgresql://12.34.5.6:5432/",
        "connection.user": "user",
        "connection.password": "password",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "topic.prefix": "incre_test",
        "mode": "incrementing",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "query" :"SELECT cast(replace(id, 'ab','') as integer) as id , name from test ORDER BY id ASC",
        "incrementing.column.name":"id",
        "value.converter.schema.registry.url": "http://schema-registry_url.com",
        "key.converter.schema.registry.url": "http://schema-registry_url.com",
        "offset.flush.timeout.ms": 2000,

    }
}
 类似资料:
  • 我想检索Order对象的列表。每个Order对象可能都有一个OrderRows列表。OrderRows保存在单独的表中。如何将下面的查询与Jdbctemplate一起使用?

  • 问题内容: 在这里使用新的logstash jdbc连接器: https://www.elastic.co/guide/zh-CN/logstash/current/plugins-inputs- jdbc.html 后续logstash运行如何影响已经编入ElasticSearch的内容?是在ES索引中创建新文档,还是更新与已经被索引的行匹配的文档?我尝试解决的用例是将带有时间戳的行索引到ela

  • 我正在使用合流kafka connect jdbc源将mysql表中的记录推送到我的kafka主题,但似乎日期列被转换为纪元时间。 这是我的配置: kafka主题中的输出: 我也在类似于“select from_unixtime(updated _ on)from temp”的查询中尝试了from _ unixtime(),但是那不行。 有没有办法推到YYYY-MM-DD HH:MM:SS格式的K

  • 我的架构如下所示: 这是你的小提琴 图式的快速解释:我有广告: 每个广告都有洞察力,这些洞察力告诉我们某个广告何时处于活动状态(=>ad_clicks必须>0)。 每个ad都有产品(MANY2MONE-表)。每个产品都有,它告诉我们该产品在某一天产生了多少销售额。 现在,我想获取时间范围-的所有广告,这些广告的>0(我已经做了),并计算每个广告在活动时产生了多少销售额。因此,只有当广告的ad_in

  • 我正在尝试将来自主题的数据(json数据)写入MySql数据库。我想我需要一个JDBC接收器连接器。 我如何配置连接器以将主题中的json数据映射到如何将数据插入数据库。 我能找到的文件只有这个。 “接收器连接器需要了解架构,因此您应该使用合适的转换器,例如架构注册表附带的Avro转换器,或启用了架构的JSON转换器。如果存在Kafka记录键,则可以是基元类型或连接结构,记录值必须是连接结构。从连

  • 我正在尝试在本地Docker容器中使用Kafka Connect(使用官方的ConFluent映像),以便将DB2数据推送到OpenShift(在AWS上)上的Kafka集群。我在使用DB2 JDBC-Jar时使用了ConFluent JDBC连接器。我有不同的连接器配置,因为我使用带有“transforms.create键”的SMT(创建我的密钥),并且我表中的键列有不同的名称。 以下是我的步骤