当前位置: 首页 > 知识库问答 >
问题:

Kafka jdbc connect Sink:是否可以将pk.fields用于value和key中的字段?

郭元凯
2023-03-14

我遇到的问题是,当jdbc接收器连接器使用kafka消息时,写入db时的关键变量为NULL。

但是,当我直接通过kafka-avro-consumer进行消费时,我可以看到键和值变量及其值,因为我使用了这个config:--property print.key=true。

问:是否可以确保jdbc连接器正在处理消息键变量值?

控制台kafka-avro配置

/opt/confluent-5.4.1/bin/kafka-avro-console-consumer \
    --bootstrap-server "localhost:9092" \
    --topic equipmentidentifier.persist \
    --property parse.key=true \
    --property key.separator=~ \
    --property print.key=true \
    --property schema.registry.url="http://localhost:8081" \
    --property key.schema=[$KEY_SCHEMA] \
    --property value.schema=[$IDENTIFIER_SCHEMA,$VALUE_SCHEMA]

org.apache.kafka.connect.errors.RetriableException: java.sql.SQLException: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "assignment_table" ("created_date","custome
r","id_type","id_value") VALUES('1970-01-01 03:25:44.567+00'::timestamp,123,'BILL_OF_LADING','BOL-123') was aborted: ERROR: null value in column "equipment_ide
ntifier_type" violates not-null constraint
  Detail: Failing row contains (null, null, null, null, 1970-01-01 03:25:44.567, 123, id, 56).  Call getNextException to see other errors in the batch.
org.postgresql.util.PSQLException: ERROR: null value in column "equipment_identifier_type" violates not-null constraint
task.max=1
topic=assignment
connect.class=io.confluet.connect.jdbc.JdbcSinkConnector

connection.url=jdbc:postgresql://localhost:5432/db
connection.user=test
connection.password=test

table.name.format=assignment_table
auto.create=false
insert.mode=insert
pk.fields=customer,equip_Type,equip_Value,id_Type,id_Value,cpId

transforms=flatten

transforms.flattenKey.type=org.apache.kafka.connect.transforms.Flatten$Key
transforms.flattenKey.delimiter=_

transforms.flattenKey.type=org.apache.kafka.connect.transforms.Flatten$Value
transforms.flattenKey.delimiter=_

Kafka键:

{
  "assignmentKey": {
    "cpId": {
      "long": 1001
    },
    "equip": {
      "Identifier": {
        "type": "eq",
        "value": "eq_45"
      }
    },
    "vendorId": {
      "string": "vendor"
    }
  }
}

Kafka值:


{
  "assigmentValue": {
    "id": {
      "Identifier": {
        "type": "id",
        "value": "56"
      }
    },
    "timestamp": {
      "long": 1234456756
    },
    "customer": {
      "long": 123
    }
  }
}

共有1个答案

廖诚
2023-03-14

您需要告诉连接器使用键中的字段,因为默认情况下它不会。

pk.mode=record_key

但是,您需要使用键或值中的字段,而不是同时使用配置中的字段:

pk.fields=customer,equip_Type,equip_Value,id_Type,id_Value,cpId

如果设置pk.mode=record_key,则pk.fields将引用消息键中的字段。

ref:https://docs.confluent.io/current/connect/kafka-connect-jdbc/sink-connector/sink_config_options.html#sink-pk-config-options

另请参见https://rmoff.dev/kafka-jdbc-video

 类似资料:
  • 问题内容: 我在寻找什么: 设置一个字符的一半样式的方法。(在这种情况下,一半字母是透明的) 我目前正在搜索并尝试过的内容(没有运气): 样式字符/字母一半的方法 使用CSS或JavaScript设置字符的样式 将CSS应用于字符的50% 以下是我尝试获取的示例。 为此是否存在CSS或JavaScript解决方案,还是我不得不诉诸图像?我宁愿不走图片路线,因为此文本最终会动态生成。 更新: 既然许

  • 问题内容: Servlet规范(请参阅我的上一个问题)保证同一线程将执行所有过滤器和关联的Servlet。鉴于此,如果可以使用a选项(假设您正确清理),我看不到使用传递数据有任何用处。我觉得使用有两个好处:类型安全和更好的性能,因为没有使用任何字符串键或映射(除非可能是通过(非字符串)线程id进入线程集合)。 有人可以确认我是否正确,以便我可以放弃吗? 问题答案: ThreadLocal是否比Ht

  • 阅读文档时,并不十分清楚。 我想要的是能够存储和检索简单的json文档。使用CloudSearch,似乎可以以SDF格式存储文档,然后搜索它们,但它只返回文档ID和指定字段的一小部分(我认为是200个字符)。 有没有一种方法来检索完整的文档ID只是使用CloudSearch?还是打算作为搜索和使用主存储服务的附加工具?

  • 问题内容: 今天,我在采访中面临一个问题。是否可以在Singleton类上应用继承概念?我说过,由于构造函数是私有的,因此我们无法扩展该Singleton类。 他问我的下一件事是将继承应用于该Singleton类。因此,我将Singleton的构造函数作为受保护对象,认为孩子的构造函数也已受到保护。但是我错了,孩子可以有一个等于或大于该值的修饰符。 因此,我请他在这种情况下举一个真实的例子。他没能

  • 问题内容: 在网络上搜索,尚不清楚Android开发是否支持Java 8。 在我下载/设置Java 8之前,可以让我指出任何“正式”文档,其中说Android开发是否支持Java 8 问题答案: Java 8 Android支持所有Java 7语言功能以及部分Java 8语言功能,具体取决于平台版本。 检查支持的Java 8功能 使用Java 8语言功能 我们决定直接在当前的javac和dx工具集

  • 问题内容: 我正在使用一种API,该API为我提供了XML,并且我需要从一个实际上是字符串的标签中获取一张地图。例: 有 我需要 问题在于字符串是完全动态的,因此它可以像 所以我不能只用逗号然后用等号分开。是否可以使用正则表达式将类似字符串的内容转换为地图? 到目前为止,我的代码是: 先感谢您 问题答案: 您可以使用 请参阅regex演示。 细节 -第1组:一个或多个单词字符 -等号 -第2组:除