使用此Kafka Connect连接器:
https://www.confluent.io/hub/confluentinc/kafka-connect-s3
我手动将其安装到我的kafka Connect Docker映像的插件中。我的目的是使用Kafka Connect将来自Kafka主题的Avro记录写入S3。
在运行时,使用Kafka Connect,我会得到以下错误:
ERROR Stopping due to error (org.apache.kafka.connect.cli.ConnectDistributed) [main]
org.apache.kafka.common.config.ConfigException: Invalid value io.confluent.connect.avro.AvroConverter for configuration key.converter: Class io.confluent.connect.avro.AvroConverter could not be found.
at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:744)
at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:490)
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:483)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:108)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:129)
at org.apache.kafka.connect.runtime.WorkerConfig.<init>(WorkerConfig.java:452)
at org.apache.kafka.connect.runtime.distributed.DistributedConfig.<init>(DistributedConfig.java:405)
at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:95)
at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:80)
在ConFluent开源中查找该类并查看其他ConFluent Kafka Connect插件后,我的解决方案是从ConFluent Kafka Connect Avro Converter插件复制kafka-connect-avro-converter-7.1.1.jar
如下:
https://www.confluent.io/hub/confluentinc/kafka-connect-avro-converter
进入conFluentInc-kafka-contt-s3*/lib
目录。
这似乎是可行的,但似乎有点黑客,而且似乎应该有一种更好的支持、更简单的方法来实现这一点。有更好的方法吗?
理想情况下,您应该使用汇合集线器来添加连接器和转换器。
https://docs.confluent.io/platform/current/installation/docker/development.html#add-connectors-or-software
否则,可以从Maven Central或Confluent存储库下载并提取相关包,然后添加到插件路径。
我有一个kafka主题,有200万条消息,我的刷新大小是100000,默认分区为分布式模式,有4个工作者,我可以看到数据在几秒钟内立即写入HDFS(10到15秒)。 我看到创建了一个+tmp目录和文件夹,并且每次触发一个新连接器时都会创建主题。 kafka connect的行为是每次都写得这么快,还是已经将数据存储在HDFS中,并根据连接器属性将其移动到主题目录? 我需要清楚这是怎么发生的。如果我
我对Kafka connect很陌生。我想把我的信息从Kafka主题推到弹性搜索。在阅读了可用的文档之后,我从发行版tar下载并编译了弹性搜索接收器。拉链(https://github.com/confluentinc/kafka-connect-elasticsearch/releases) 我添加了弹性搜索属性文件,并将上述jar包含在类路径中。当我在独立模式下运行kafka connect时
我正在尝试使用Kafka Connect Elasticsearch连接器,但没有成功。它正在崩溃,并出现以下错误: 我已经在kafka子文件夹中解压了插件的编译版本,并在connect-standalone.properties中有以下代码行: 我可以看到该文件夹中的各种连接器,但Kafka Connect不加载它们;但它确实加载了标准连接器,如下所示: 如何正确注册连接器?
我正在尝试将来自主题的数据(json数据)写入MySql数据库。我想我需要一个JDBC接收器连接器。 我如何配置连接器以将主题中的json数据映射到如何将数据插入数据库。 我能找到的文件只有这个。 “接收器连接器需要了解架构,因此您应该使用合适的转换器,例如架构注册表附带的Avro转换器,或启用了架构的JSON转换器。如果存在Kafka记录键,则可以是基元类型或连接结构,记录值必须是连接结构。从连
我有一个Kafka连接接收器记录从Kafka主题到S3。它在工作,但太慢了。Kafka主题每秒接收约30000条消息。连接接收器无法跟上。我已经尝试增加Kafka连接器的任务。最大值从1到3,这会创建更多任务,但这似乎无助于提高消息/秒的速度。我试着增加Kafka连接工人的CPU分配,这似乎也没有帮助。 我还能试什么?哪些指标有助于监控以进一步识别瓶颈? 更新:Kafka主题有5个分区。Kafka
我正在windows计算机上使用Kafka,并尝试使用文件源连接器生成从文件到Kafka主题的内容。首先我启动了zookeeper,然后在启动Kafka Standalone Connector时启动了Kafka server(步骤3),我收到了很多警告,ReflectionsException 我对此没有什么疑问: 1。我需要添加一些jar文件吗 2。在libs文件夹下的Kafka dir中有一