当前位置: 首页 > 知识库问答 >
问题:

KAFKA connect EOFException(当使用汇流代理和源连接器运行独立连接时)

苏胤
2023-03-14

我正在尝试使用示例soure连接器运行一个connect独立应用程序。(FileSource或JDBC连接器)。我不断重复错误消息,例如

[2022-11-14 18:33:09,641] DEBUG [local-file-source|task-0] [Producer clientId=connector-producer-local-file-source-0] Connection with xxxx.westeurope.azure.confluent.cloud/ (channelId=-1) disconnected (org.apache.kafka.common.network.Selector:606)
java.io.EOFException
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:97)
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:452)
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:402)
    at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:674)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:576)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:481)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560)
    at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:328)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
    at java.base/java.lang.Thread.run(Thread.java:1589)
[2022-11-14 18:33:09,643] INFO [local-file-source|task-0] [Producer clientId=connector-producer-local-file-source-0] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient:935)
[2022-11-14 18:33:09,645] DEBUG [local-file-source|task-0] [Producer clientId=connector-producer-local-file-source-0] Cancelled in-flight API_VERSIONS request with correlation id 0 due to node -1 being disconnected (elapsed time since creation: 34ms, elapsed time since send: 34ms, request timeout: 30000ms): ApiVersionsRequestData(clientSoftwareName='apache-kafka-java', clientSoftwareVersion='3.2.1') (org.apache.kafka.clients.NetworkClient:335)
[2022-11-14 18:33:09,647] WARN [local-file-source|task-0] [Producer clientId=connector-producer-local-file-source-0] Bootstrap broker xxxx.westeurope.azure.confluent.cloud:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient:1063)
[2022-11-14 18:33:09,757] DEBUG [local-file-source|task-0] [Producer clientId=connector-producer-local-file-source-0] Initialize connection to node xxxx.westeurope.azure.confluent.cloud:9092 (id: -1 rack: null) for sending metadata request (org.apache.kafka.clients.NetworkClient:1160)
[2022-11-14 18:33:09,758] DEBUG [local-file-source|task-0] Resolved host xxxx.westeurope.azure.confluent.cloud as  (org.apache.kafka.clients.ClientUtils:113)
[2022-11-14 18:33:09,758] DEBUG [local-file-source|task-0] [Producer clientId=connector-producer-local-file-source-0] Initiating connection to node xxxx.westeurope.azure.confluent.cloud:9092 (id: -1 rack: null) using address xxxx.westeurope.azure.confluent.cloud/ (org.apache.kafka.clients.NetworkClient:989)
[2022-11-14 18:33:09,787] DEBUG [local-file-source|task-0] [Producer clientId=connector-producer-local-file-source-0] Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1 (org.apache.kafka.common.network.Selector:531)
[2022-11-14 18:33:09,788] DEBUG [local-file-source|task-0] [Producer clientId=connector-producer-local-file-source-0] Completed connection to node -1. Fetching API versions. (org.apache.kafka.clients.NetworkClient:951)
[2022-11-14 18:33:09,789] DEBUG [local-file-source|task-0] [Producer clientId=connector-producer-local-file-source-0] Initiating API versions fetch from node -1. (org.apache.kafka.clients.NetworkClient:965)
[2022-11-14 18:33:09,789] DEBUG [local-file-source|task-0] [Producer clientId=connector-producer-local-file-source-0] Sending API_VERSIONS request with header RequestHeader(apiKey=API_VERSIONS, apiVersion=3, clientId=connector-producer-local-file-source-0, correlationId=1) and timeout 30000 to node -1: ApiVersionsRequestData(clientSoftwareName='apache-kafka-java', clientSoftwareVersion='3.2.1') (org.apache.kafka.clients.NetworkClient:521)
[2022-11-14 18:33:09,817] DEBUG [local-file-source|task-0] [Producer clientId=connector-producer-local-file-source-0] Connection with xxxx.westeurope.azure.confluent.cloud/ (channelId=-1) disconnected 

我可以用kafka-topics.sh命令创建一个主题,通过控制台生产者向主题写入消息,并通过控制台消费者以及带有接收器连接器的connect-standalone从主题读取消息。

如果我在本地运行kafka服务器和zookeper,一切似乎都很好。

命令行:

bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties

连接-独立.属性

bootstrap.servers=pkc-pj9zy.westeurope.azure.confluent.cloud:9092
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="name" password="passphrase";
ssl.protocol=TLSv1.2
ssl.enabled.protocols=TLSv1.2
plugin.path=./plugins,./libs
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

连接文件源属性

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=test_ksql_af_file_source-test
auto.create=true
auto.evolve=true

共有1个答案

包永新
2023-03-14

已经解决了。

缺少属性

producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="name" password="passphrase";

在我的设置中。

不幸的是,日志没有对此给出任何提示

 类似资料:
  • 在我的程序中,我正在访问wep api。最多可以有7个不同的线程访问web api的不同服务器。每个线程负责一个服务器,每个服务器速率限制每个线程。每个线程更新相同的mysql数据库。线程数保持不变。 在我的示例中,是否需要连接池?我不应该只打开7个不同的连接,这些连接将在程序的生命周期中打开吗?

  • 我想使用Confluent的JDBC源连接器将数据从SQL Server表检索到Kafka中。 任何帮助都将不胜感激。

  • 我有一个kafka主题,有200万条消息,我的刷新大小是100000,默认分区为分布式模式,有4个工作者,我可以看到数据在几秒钟内立即写入HDFS(10到15秒)。 我看到创建了一个+tmp目录和文件夹,并且每次触发一个新连接器时都会创建主题。 kafka connect的行为是每次都写得这么快,还是已经将数据存储在HDFS中,并根据连接器属性将其移动到主题目录? 我需要清楚这是怎么发生的。如果我

  • 我们试图在给定的节点上启动多个独立的kafka hdfs连接器。 对于每个连接器,我们分别将和设置为不同的端口和路径。 也是Kafka经纪人JMX港是@ 9999。 当我启动 kafka 独立连接器时,出现错误 错误:代理引发异常:java.rmi.server。ExportException:端口已在使用:9999;嵌套异常是:java.net。BindException:地址已在使用中(绑定失

  • 我正在尝试从kafka中的主题将数据插入postgres数据库。我正在使用以下命令加载 sink-quick start-MySQL . properties如下 我得到的错误是 Postgres jar文件已经在文件夹中。有人能提出建议吗?

  • 问题内容: 在python中,我有变量和。我想把它们串联起来获得。但是在Windows下,我应该使用和用于POSIX 。 如何使该平台独立? 问题答案: 您要为此使用os.path.join()。 使用此方法而不是使用字符串连接等方法的优势在于,它知道各种特定于OS的问题,例如路径分隔符。例子: 在 Windows 7下 : 在 Linux下 : 所述OS模块包含目录,路径操纵并找出OS特定信息许

  • 我已经在本地下载了wiremock独立jar。我使用下面的命令启动独立服务器。java-jar wiremock-jre8-standalone-2.26.3.jar--端口8089 我需要一个可以连接到我的wiremock服务器并显示所有映射的用户界面。如果UI能够提供永久编辑、删除和添加新文件的特征,这将是有利的。

  • 问题内容: 我们正在经历升级数据库软件的过程,还将从Tomcat 5.5升级到Tomcat7。结果,我使用的是新的JDBC驱动程序,在本例中为推荐的SQL Anywhere JDBC 4.0驱动程序,它需要ODBC服务。 我将其全部用于Eclipse方面取得了巨大的成功。但是奇怪的是,当我尝试在Eclipse之外运行Tomcat时,出现以下错误: 我 可以 得到一个纯Java驱动程序(jConne