当前位置: 首页 > 知识库问答 >
问题:

从MSSQL vis kafka connector到具有嵌套类型的elasticsearch的大容量数据更新失败

丁星火
2023-03-14
{
   "schema":{
      "type":"struct",
      "fields":[
         {
            "type":"int32",
            "optional":false,
            "field":"prop"
         },
         {
            "type":"string",
            "optional":true,
            "field":"roles"
         }
 ],
      "optional":false
   },
   "payload":{ "prop":100, "roles":"[{"role":"actor"},{"role":"director"}]"}
Error was [{"type":"mapper_parsing_exception","reason":"object mapping for [roles] tried to parse field [roles] as object, but found a concrete value"}
name=prop-test
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
connection.url=<elasticseach url>

tasks.max=1

topics=test_prop
type.name=prop

#transforms=InsertKey, ExtractId

transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.InsertKey.fields=prop

transforms.ExtractId.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.ExtractId.field=prop
name=test_prop_source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:sqlserver://*.*.*.*:1433;instance=databaseName=test;
connection.user=*****
connection.password=*****
query=EXEC <store proc>
mode=bulk
batch.max.rows=2000000
topic.prefix=test_prop
transforms=createKey,extractInt
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields=prop

transforms.extractInt.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractInt.field=prop
    bootstrap.servers=localhost:9092

    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter

    key.converter.schemas.enable=true
    value.converter.schemas.enable=true

共有1个答案

谭研
2023-03-14

事实上,jdbc源连接器将始终将columnname视为fieldname,将columnvalue视为Value。这种从字符串到数组的转换在现有的jdbc源连接器支持下是不可能的,它必须是自定义转换或自定义插件才能启用。

从MSSQL中获取数据并将其插入弹性搜索的最佳选择是使用logstash。它有丰富的过滤器插件,可以使来自MSSQL的数据以任何所需的格式流到任何所需的JSON输出环境(logstash/kafka主题)

流:MSSQL-->logstash-->Kakfa主题-->Kafka弹性接收器连接器-->弹性搜索

 类似资料:
  • 如何在PySpark中更改嵌套列的datatype?对于rxample,如何将value的数据类型从string更改为int? 参考:如何在pyspark中将Dataframe列从String类型更改为Double类型

  • 问题内容: 示例文档中有一个简化的文档。这对我理解非嵌套类型与嵌套类型的聚合差异很有帮助。但是,这种简化掩盖了进一步的复杂性,因此我不得不在这里扩展这个问题。 所以我的实际文件更接近以下内容: 因此,我保留了,和的关键属性,但隐藏了许多其他使情况复杂化的内容。首先,请注意,与引用的问题相比,有很多额外的嵌套:在根和“项目”之间,以及在“项目”和“ item_property_1”之间。此外,还请注

  • 我想要一个函数,该函数可以执行以下操作: 更一般地说,是否存在与此模式匹配的typeclass?

  • 问题内容: 我一直在尝试搜索包含嵌套字段的文档。我创建了这样的嵌套映射: 我的搜索如下所示: 但是我收到此错误消息: 编辑 我仍然收到此错误。我正在通过Java执行此操作,因此这是我要创建的文档: 像这样创建索引: 我认为我可能对文档编制了错误的索引。 问题答案: TLDR:输入您的嵌套类型。 假设我们有一个普通类型,并且嵌套了另一个类型: 该行是分配给的嵌套查询所必需的,如下所示: 该行似乎仅在

  • 我正在尝试将我们的电子商务搜索系统转移到弹性搜索。我们有一堆产品,每个产品都可以有多个报价(由商家出售)。文档的大致格式是 更多事实: 产品更新率高于产品更新率 这是我的问题 我的问题1。我使用嵌套类型进行报价,因为我想使用offer\u价格对产品进行排序。我读到家长/孩子不支持排序,但事实上每次更新产品都会重新索引整个产品,这让我怀疑家长/孩子是否是更好的选择 2。我想为每件退回的产品提供最好的

  • 问题内容: 使用redux更新商店中嵌套数据数组的最佳/正确方法是什么? 我的商店看起来像这样: 我有一对异步操作来更新完整的对象,但是我还有另一对操作要更新特定的数组。 我的减速器当前看起来像这样,但是我不确定这是否是正确的方法: 问题答案: React的不变性帮助器是一种创建普通旧JavaScript对象的更新版本而无需对其进行突变的便捷方法。 您为它提供了要更新的源对象和一个对象,该对象描述

  • 问题内容: 我有一种情况,我以键值对json格式doc收集一些常规信息和数据库信息(db2,oracle,sybase,informix)。 我还具有一些规则来检查上述文档是否满足特定规则,如果满意,则退还该特定文档以进行分析。 这是医生 这是它的映射 规则标准 列出文档,其中鸣叫与和具有或当。 因此,仅当以下条件匹配时,以上文档才应返回结果。 如果(name ==“ Athena”)&&(db。

  • 好吧,我一直在Kotlin试验原始数据存储,我有一个问题。我正在使用以下内容。原型文件: 这是我的序列化程序类: 和我的存储库: 因此,在我的updateValue()方法中,我可以设置“name”字段的名称,但我没有地址消息字段的setter,如street和number。编译器只显示getter。在姓名和年龄字段的另一边,我有setters。如何对这两个地址字段使用setters:街道、编号?