我正在为 Elasticsearch 编写一个 Kafka Sink 连接器。 我实现了启动,把,刷新,关闭方法在Sink任务类。 但是,我不知道Kafka Sink Connector的行为到底起什么作用。 如果Connect Worker重复执行所有这些任务,即通过< code>put()方法从Kafka代理获取SinkRecord,在内部对其进行处理,然后将数据发送到Elasticsearc
我正在使用Confluent Kafka一体式docker图像在DigitalOcean水滴上设置Kafka。我能够成功运行Kafka并使用Kafka Connect REST API添加HDFS连接器。我用Cloudera CDH液滴的IP替换了HOST_IP。 然后,当我卷曲Kafka Connect以获取hdfs接收器状态时,我在任务下的JSON响应中收到以下错误(服务状态正在运行,但任务失
我使用Kafka和Kafka Connect将MS SQL Server数据库复制到MySQL,使用debezium SQL Server CDC源连接器和汇合的JDBC汇连接器。“auto.create”设置为true,接收连接器确实创建了表,但某些数据类型不匹配。在SQL Server中,我有 但在 MySQL 中,它创建了以下内容: 忽略消息,这是我在 SMT 中添加的额外字段。 名字、姓氏
我需要从Kafka主题获取消息并通过基于HTTP的API通知其他系统。也就是说,从主题获取消息,映射到第三方API并调用它们。我打算为此编写一个Kafka Sink连接器。 对于这个用例,Kafka Connect是正确的选择还是我应该使用Kafka客户端。
有没有办法通过Kafka Connect S3接收器连接器标记写入S3存储桶的对象。我正在读取来自Kafka的消息,并使用S3接收器连接器将avro文件写入S3存储桶。当文件写入S3存储桶时,我需要标记文件。
我使用自己的自定义Sink插件运行Kafka Connect集群(本地有1个工人Docker Compose)。我想在连接器中使用几个主题:topicA、topicB、topicC,每个主题都有一个分区。 我的连接器启动时的配置子集如下: 使用此配置,我希望Kafka Connect为每个接收器任务分配一个主题,但遗憾的是,这不是我看到的。实践中发生的情况是,为分配了所有主题的每个任务调用Sink
我已经使用kafka source connector将文档从Couchbase传输到kafka。这些文档然后被复制到Mongo DB。 沙发底座 -- 如果源连接器关闭,那么如何再次将所有文档同步到 Kafka? 有没有什么get和touch功能可以将kafka主题在关闭期间所做的所有更改都显示出来?
我试图使用来自kafka的消息,源消息以Avro格式序列化(我使用了AWS模式注册表)。 连接器配置: 但是当我尝试配置接收器连接器时,它会出现以下错误。
我有一个 Confluent 接收器连接器,它正在从 Kafka 主题获取数据。然后,它会摄取到 S3 存储桶中。 摄取工作正常,一切都很好,但是现在我需要在将Avro数据放入存储桶之前对其进行压缩。 我已经尝试了以下配置 “avro.code”,我以为会压缩数据,但它没有。取而代之的是,我还尝试了“ ”s3.compression.type“: ”snappy“ ',仍然没有运气!但是,这确实适
我在 AWS S3 中备份了以下文件,这些文件由 Kafka 连接接收器连接器备份: 当使用Kafka connect S3源恢复主题时,密钥文件被忽略,我在日志中看到以下调试消息: 我的源配置如下所示: 我应该做什么改变才能让密钥和消息一起存储在Kafka中?
我正在使用JDBC Kafka连接器将数据从数据库读取到Kafka中。这有效,但它总是以 Avro 格式输出数据,即使我已经指定它应该使用 JSON。我知道它正在这样做,因为当我在 python 中使用来自该主题的消息时,我在每条消息的顶部看到架构。 我像这样运行连接器: connect-JSON-standalone . properties文件的内容是: 和/etc/Kafka-connect
我正在使用融合的 http 接收器连接器从 kafka 主题读取消息并将其发送到endpoint。下面是 http 接收器连接器, 主题中的消息具有以下 json 格式, 因此,使用超文本传输协议接收器连接器,我需要从主题消息中检索“endpointurl”和“凭据”的值,并在同一连接器中使用与“http.api.url”、“connection.user”和“connection.passwor
使用最新的 kafka 和 confluent jdbc 接收器连接器。发送一个非常简单的 Json 消息: 但是出现错误: Jsonlint说Json是有效的。我在 kafka 配置中保留了 json 。有什么指示吗?
我对Kafka connect很陌生。我想把我的信息从Kafka主题推到弹性搜索。在阅读了可用的文档之后,我从发行版tar下载并编译了弹性搜索接收器。拉链(https://github.com/confluentinc/kafka-connect-elasticsearch/releases) 我添加了弹性搜索属性文件,并将上述jar包含在类路径中。当我在独立模式下运行kafka connect时
我目前正在使用ConFluent HDFS Sink连接器(v4.0.0)来替换Camus。我们正在处理敏感数据,因此我们需要在切换到连接器期间保持偏移的一致性。 移交计划: 我们创建了hdfs接收器连接器,并订阅了一个主题,该主题将写入临时hdfs文件。这将创建一个名为connect-的用户组 已使用DELETE请求停止连接器 使用/usr/bin/kafka使用者组脚本,我可以将连接器使用者组