当前位置: 首页 > 知识库问答 >
问题:

Kafka连接问题

邢飞雨
2023-03-14

我在CentOS7(confluent)上安装了Apache Kafka,正试图以分布式模式运行filestream Kafka connect,但收到以下错误:

[2017-08-10 05:26:27,355] INFO Added alias 'ValueToKey' to plugin 'org.apache.kafka.connect.transforms.ValueToKey' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:290)
Exception in thread "main" org.apache.kafka.common.config.ConfigException: Missing required configuration "internal.key.converter" which has no default value.
at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:463)
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:62)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:75)
at org.apache.kafka.connect.runtime.WorkerConfig.<init>(WorkerConfig.java:197)
at org.apache.kafka.connect.runtime.distributed.DistributedConfig.<init>(DistributedConfig.java:289)
at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:65)

现在可以通过更新workers.properties(如http://docs.confluent.io/current/connect/userguide.html#connect-userguide-distributed-config所述)来解决这个问题

使用的命令:

/home/arun/kafka/confluent-3.3.0/bin/connect-distributed.sh ../../../properties/file-stream-demo-distributed.properties

FILESTREAM属性文件(workers.properties):

name=file-stream-demo-distributed
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
tasks.max=1
file=/tmp/demo-file.txt
bootstrap.servers=localhost:9092,localhost:9093,localhost:9094
config.storage.topic=demo-2-distributed
offset.storage.topic=demo-2-distributed
status.storage.topic=demo-2-distributed
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter.schemas.enable=false
group.id=""
bootstrap.servers=localhost:9092,localhost:9093,localhost:9094
config.storage.topic=demo-2-distributed
offset.storage.topic=demo-2-distributed
status.storage.topic=demo-2-distributed
group.id=""
kafka-console-consumer --zookeeper localhost:2181 --topic demo-2-distributed --from-beginning

我想我错过了一些最基本的东西。谁能帮忙吗?

共有1个答案

龚伯寅
2023-03-14

您需要为Kafka connect框架定义唯一的主题以存储其配置、偏移量和状态。

在workers.properties文件中,将这些参数更改为如下所示:

config.storage.topic=demo-2-distributed-config
offset.storage.topic=demo-2-distributed-offset
status.storage.topic=demo-2-distributed-status

这些主题用于存储connect的状态和配置元数据,而不用于存储connect上运行的任何连接器的消息。不要在这三个主题中的任何一个上使用控制台消费者,并期望看到消息。

{
  "name": "MyFileSink",
  "config": {
      "topics": "mytopic",
      "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
      "tasks.max": 1,
      "key.converter": "org.apache.kafka.connect.storage.StringConverter",
      "value.converter": "org.apache.kafka.connect.storage.StringConverter",
      "file": "/tmp/demo-file.txt"
    }
}

分布式工作程序运行后,您需要使用curl将配置文件应用到它,如下所示:

curl -X POST -H "Content-Type: application/json" --data @file-sink-config.json http://localhost:8083/connectors

之后,配置将安全地存储在您为所有分布式工作者创建的配置主题中。确保config主题(以及status和offset主题)不会使消息过期,否则会使连接器配置松散。

 类似资料:
  • 我用的是Kafka 0.8.2-beta,有2台Ubuntu 14虚拟机: 172.30.141.127正在运行动物园管理员 172.30.141.184在经营一家Kafka经纪人 我正在启动动物园管理员实例,如果一切顺利的话。然后,我尝试启动代理并将其连接到172.30.141.127:2181。它似乎能够在特定的端口上连接并建立会话,但是由于一些似乎没有记录的异常,它失去了连接。 代理输出:

  • 我正在尝试对Kafka消息流进行流处理和CEP。为此,我选择Apache Ignite首先实现一个原型。但是,我无法连接到队列: 使用KAFKA2.11-0.10.1.0 apache-ignite-fabric-1.8.0-bin Kafka工作正常,我用一个消费者测试了它。然后启动ignite,然后在spring boot命令行应用程序中运行following。 当应用程序启动时,我得到 20

  • 这个包来连接Kafka,之后实现了个连接池的功能,代码如下: 开发完之后,我打算执行交叉编译,编译为Linux下可执行文件,打包脚本如下:(之前使用MQ的时候就没有问题,换成了kafka就报错): 报错如下:

  • 我在加入一个KStream和一个GlobalKTable时遇到了一个问题,希望能得到您的帮助。 给定两个Kafka主题和: 订单 客户 需求是用客户名称丰富订单流 我正在尝试以下操作: null

  • 首先我使用了 "github.com/confluentinc/confluent-kafka-go/kafka" 这个包来连接Kafka,之后实现了个连接池的功能,代码如下: 开发完之后,我打算执行交叉编译,编译为Linux下可执行文件,打包脚本如下:(之前使用MQ的时候就没有问题,换成了kafka就报错): 报错如下: 十分不理解,这是什么原因呢?换成kafka之后,不能进行交叉编译么?请各位

  • 在与docker和kafka的基础上磕磕绊绊,无法获得客户端连接 到目前为止我所做的 docker-机器活动,不返回活动主机 我的groovy类(从一个示例中剪切和粘贴,连接如下所示 当我运行这个init时,我得到的错误是它不能解析连接,因为java.io.ioException:不能解析地址:7BF9F9278E64:9092,这是内部容器端口。(我的脚本正在从我的普通IDE桌面环境中调用) 感