当前位置: 首页 > 面试题库 >

Kafka Elasticsearch连接器时间戳

潘嘉颖
2023-03-14
问题内容

我可以看到例如在这里进行了几次讨论,但是我认为由于Elasticsearch中的重大更改,解决方案已过时。

我正在尝试将我在Kafka主题中的Json中的long / epoch字段转换为通过连接器推送的Elasticsearch日期类型。

当我尝试添加动态映射时,我的Kafka连接更新失败,因为我试图将两个映射应用于字段_doc和kafkaconnect。我认为这是关于版本6的重大更改,我相信每个索引只能有一个映射。

{
    "index_patterns": [ "depart_details" ],
  "mappings": {
    "dynamic_templates": [
      {
        "scheduled_to_date": {
          "match":   "scheduled",
          "mapping": {
            "type": "date"
          }
        }
      } 
    ]
}}

我现在专注于尝试通过将字段更改为时间戳,时间或日期来在连接器中的源处翻译消息。

    "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
        "transforms.TimestampConverter.field" : "scheduled",
        "transforms.TimestampConverter.target.type": "Timestamp"

但是,我尝试通过此转换器发送的任何消息均失败

Caused by: org.apache.kafka.connect.errors.DataException: Java class class java.util.Date does not have corresponding schema type.
    at org.apache.kafka.connect.json.JsonConverter.convertToJson(JsonConverter.java:604)
    at org.apache.kafka.connect.json.JsonConverter.convertToJson(JsonConverter.java:668)
    at org.apache.kafka.connect.json.JsonConverter.convertToJsonWithoutEnvelope(JsonConverter.java:574)
    at org.apache.kafka.connect.json.JsonConverter.fromConnectData(JsonConverter.java:324)
    at io.confluent.connect.elasticsearch.DataConverter.getPayload(DataConverter.java:181)
    at io.confluent.connect.elasticsearch.DataConverter.convertRecord(DataConverter.java:163)
    at io.confluent.connect.elasticsearch.ElasticsearchWriter.tryWriteRecord(ElasticsearchWriter.java:285)
    at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:270)
    at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:169)

似乎需要做的事情确实很普通,但是我看不到如何通过版本7中的此连接器将日期或时间字段输入到Elastic中?


问题答案:

Confluent文档指出ES 7 当前不支持 ES连接器。

根据此问题,更改type.name=kafkaconnecttype.name=_doc连接器配置可能就足够了。



 类似资料:
  • 我正在使用部署在Kafka Connect中的Debezium MySQL连接器,将MySQL更改流式传输到Kafka主题,并从中获取这些消息,从而丰富数据并将数据推送到另一个MySQL。 源和接收器都是MySQL。 我的源表中有几个列,列数据类型为TIMESTAMP。 创建时间:2021-10-06 09:32:46 我可以在Kafka的信息中看到上述数据,如下所示 “创建时间”:“2021-1

  • **dataframe2:从另一个来源获得的键的Dataframe(这些键是上表中ID列的分区键)-此表中不同键的数量约为0.15万** 现在,此代码总是导致“com.datastax.oss.driver.api.core.servererrors.ReadFailureException:在一致性LOCAL_ONE读取查询期间Cassandra失败(需要1个响应,但只有0个副本响应,1个失败)

  • 试图在localhost中建立从app容器到mysql容器的连接,出现连接拒绝异常 我们正在采取一种docker的方法来调用rest api服务来采用微服务的方法。我们正在建立应用程序容器和mysql容器之间的连接,同时我们编写了一个docker-compose文件,创建了mysql容器和应用程序容器,为这两个容器公开了端口。下面是运行docker-compose文件docker-compose

  • 问题内容: 在这里使用新的logstash jdbc连接器: https://www.elastic.co/guide/zh-CN/logstash/current/plugins-inputs- jdbc.html 后续logstash运行如何影响已经编入ElasticSearch的内容?是在ES索引中创建新文档,还是更新与已经被索引的行匹配的文档?我尝试解决的用例是将带有时间戳的行索引到ela

  • 下面是/etc/kafka/connect-MongoDB-source.properties中的MongoDB配置 但是低于误差 以独立模式运行连接器。 我在debezium-debezium-连接器-mongob-1.0.0/debezium-connector-mongodb-1.0.0.Final.jar 类路径的设置如下 使用插件路径,我看到它能够注册和加载所有必需的插件。 但最后还是同

  • 我正试图连接到mysql服务器,但这需要5秒钟。