当前位置: 首页 > 知识库问答 >
问题:

自定义Kafka Connect-ElasticSearch接收器连接器

程淮晨
2023-03-14

我有Kafka主题,有多种类型的消息流入并使用Kafka Connect写入弹性搜索。流看起来不错,直到我不得不将唯一的消息集分离到唯一的索引中。也就是说,我必须根据字段(JSON消息)为新的数据集获取新的索引。

我如何配置/定制Kafka connect以实现同样的功能?每个消息都包含一个表示消息类型和时间戳的字段。

示例 Json 如下所示: Sample1: {“log”:{“data”:“information”, “version”:“1.1”}, “type”:“xyz”, “timestamp”:“2019-08-28t10:07:40.370z”, “value”:{}} ,

示例2:{"日志":{"数据":"信息","版本":"1.1","值":{}}, "类型":"abc","时间戳":"2019-08-28t10:07:40.370z"}

我想定制/配置Kafka connect ES接收器,将Sample1文档写入索引'xyz。20190828’和Sample2文档索引为“abc.20190828”。

我用的是Kafka-2.2.0,和confluent Inc-Kafka-connect-elastic search-5 . 2 . 1插件。

感谢你的帮助。

共有1个答案

丌官博文
2023-03-14

您可以使用自定义的单消息转换(SMT)来实现这一点,这需要您自己编写。通过根据内容改变邮件的主题,您可以将邮件发送到不同的Elasticsearch索引。

目前,Apache Kafka附带了一个SMT,它可以重命名整个主题(< code>RegExRouter)或添加时间戳(< code>TimestampRouter)。你可能会发现这是一个有用的写作起点。

另一种选择是正如@wardzniak在他的评论中建议的那样——在使用Kafka Connect将生成的单独主题发送到Elasticsearch之前,使用流处理(例如Kafka Streams、KSQL)对源主题进行预处理。

 类似资料:
  • 我参考了以下链接来了解Kafka的HDFS连接https://docs.confluent.io/2.0.0/Connect/connect-hdfs/docs/index.html 我能够通过配置单元集成将数据从Kafka导出到HDFS。 现在我正尝试在Java程序的帮助下将avro记录写入Kafka 当我把Avro记录写到Kafka主题时,我在Connect中出现以下错误

  • 我正在尝试使用docker容器中的kafka connect和一个自定义连接器(PROGRESS _ DATADIRECT _ JDBC _ OE _ all . jar)来连接openedge数据库。 我将JAR文件放在插件路径(usr/share/java)中,但它不会作为连接器加载。 我可以通过将另一个(标准)连接器放在插件路径中来加载它。这行得通 有点不知道如何前进,我对Kafka很陌生。

  • 目标是:开发一个自定义Kafka连接器,该连接器以循环方式从websocket读取消息。我试着给你们举一个我所认识到的例子: 我创建了一个接口IWebsocketClientEndpoint 以及实现上述接口的类: WebsocketClientEndpoint类专用于创建websocket并管理连接、断开连接、发送和接收消息。 目标是:如何在Kafka连接结构中调整我的websocket结构?我

  • 在标准/自定义kafkaconnect接收器中,我们如何指定它应该只使用来自kafka主题的read_comitted消息。我可以在这里看到配置,但看不到任何选项(除非这是默认行为)。谢了。https://docs . confluent . io/current/installation/configuration/connect/sink-connect-configs . html

  • 我正在尝试将来自主题的数据(json数据)写入MySql数据库。我想我需要一个JDBC接收器连接器。 我如何配置连接器以将主题中的json数据映射到如何将数据插入数据库。 我能找到的文件只有这个。 “接收器连接器需要了解架构,因此您应该使用合适的转换器,例如架构注册表附带的Avro转换器,或启用了架构的JSON转换器。如果存在Kafka记录键,则可以是基元类型或连接结构,记录值必须是连接结构。从连

  • 我有一个Kafka连接接收器记录从Kafka主题到S3。它在工作,但太慢了。Kafka主题每秒接收约30000条消息。连接接收器无法跟上。我已经尝试增加Kafka连接器的任务。最大值从1到3,这会创建更多任务,但这似乎无助于提高消息/秒的速度。我试着增加Kafka连接工人的CPU分配,这似乎也没有帮助。 我还能试什么?哪些指标有助于监控以进一步识别瓶颈? 更新:Kafka主题有5个分区。Kafka