当前位置: 首页 > 知识库问答 >
问题:

debezium生成事件的Kafka连接日期处理

仉刚洁
2023-03-14

我正在使用debezium SQL Server跟踪生产基地上的更改。创建了主题,CDC的工作非常出色,但是当试图使用jdbcSinkConnector将数据转储到另一个Sql Server DB中时,我遇到了以下错误。

com.microsoft.sqlserver.jdbc.sqlserverexception:一个或多个值超出datetime2 SQL Server数据类型的值范围

在源数据库上,sql数据类型为timestamp2(7)。Kafka事件为1549461754650000000。架构类型为Int64。架构名io.debezium.time.nanotimestamp。

我找不到一种方法告诉TimestampConverter值不是以毫秒或微秒表示的,而是以纳秒表示的(无论如何,微秒不起作用)。

这是我的连接器配置

{
    "name": "cdc.swip.bi.ods.sink.contract",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "swip.swip_core.contract",
        "connection.url": "jdbc:sqlserver://someip:1234;database=DB",
        "connection.user": "loloolololo",
        "connection.password": "muahahahahaha",
        "dialect.name": "SqlServerDatabaseDialect",
        "auto.create": "false",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schemas.enable": "true",
        "key.converter.schema.registry.url": "http://localhost:8081",
        "value.converter.schemas.enable": "true",
        "value.converter.schema.registry.url": "http://localhost:8081",
        "transforms": "unwrap,created_date,modified_date",
        "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
        "transforms.created_date.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
        "transforms.created_date.target.type": "Timestamp",
        "transforms.created_date.field": "created_date",
        "transforms.modified_date.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
        "transforms.modified_date.target.type": "Timestamp",
        "transforms.modified_date.field": "modified_date",
        "insert.mode": "insert",
        "delete.enabled": "false",
        "pk.fields": "id",
        "pk.mode": "record_value",
        "schema.registry.url": "http://localhost:8081",
        "table.name.format": "ODS.swip.contract"
    }
}

共有1个答案

竺捷
2023-03-14

SQL Server连接器-DBZ-1419中缺少一个功能。

您可以通过编写自己的SMT来解决这个问题,在JDBC连接器处理字段之前,在接收器端进行字段转换。

 类似资料:
  • 我正在尝试使用Debezium将Amazon RDS中托管的Postgres SQL db与Kafka主题连接起来。 我正在遵循以下教程: 我的kafka和kafka connect服务启动良好,kafka connect服务还在/usr/share/java dir中接收我的debezium postgres连接器jar。 但是,在尝试通过kafka connect API使用以下curl命令附

  • 我正在尝试使用Debezium和kafka。我已经在属性中注册了一个包含3个表的连接器,并且它正常工作。下面是运行连接器的配置。 命令: 配置json: 这将成功执行,但当我列出Kafka主题时,只有最后3个主题出现,2个新的主题没有添加。

  • 我有一个现有的2个kafka服务器加载了mysql连接器。它起作用了。此外,我需要添加MongoDB连接器。我已经在我的Kafka服务器(Centos7)上安装了confluent schema registry,它可以工作,我停止/启动/重新启动,看起来没有什么问题。我在这里下载并提取了debezium Mongo插件/usr/连接器/插件/debezium连接器mongodb/ 我编辑了 /e

  • 下面是/etc/kafka/connect-MongoDB-source.properties中的MongoDB配置 但是低于误差 以独立模式运行连接器。 我在debezium-debezium-连接器-mongob-1.0.0/debezium-connector-mongodb-1.0.0.Final.jar 类路径的设置如下 使用插件路径,我看到它能够注册和加载所有必需的插件。 但最后还是同

  • 我正在尝试识别应用程序中的SQL连接泄漏。经过一些操作后,当我的应用程序处于空闲状态(用户未执行任何活动)时,我在返回的结果集中看到7个与我的数据库的连接。所有连接的状态均为,所有连接的值均为。 我正在使用连接池,但连接字符串中未指定连接生存期。这意味着如果我是对的,将使用它的默认值0。连接生存期的值为零意味着SQL server永远不应该关闭连接,对吗? 我让我的应用程序空闲一段时间(15-20

  • 根据Debezium SQL Server连接器文档,初始快照仅在连接器首次运行时激发。然而,如果我删除连接器并创建一个新的但具有相同的名称,初始快照也不能工作。这是故意的还是已知的问题?有什么需要帮忙的吗