当前位置: 首页 > 知识库问答 >
问题:

Kafka连接汇合JDBC MySQL实现错误

潘安平
2023-03-14

我正在尝试将MySQL与Kafka Connect连接,并且出现了许多错误。我正在共享我的connect-standalone.properties和mysql-jdbc-connector.properties,并显示错误。我的 Kafka 和 MySQL 在不同的集群中,我使用的是融合连接器,但不是在融合接口中。我下载了4.1.0 JDBC MySQL融合连接器。

MySQL-JDBC-connector.Properties

name=source-mysql
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
connection.url=jdbc:mysql://1**.**.*.29:3306/kconnect?user=bigdata&password=bigdata
connection.user=bigdata
connection.password=bigdata
task.max=10
mode=bulk
topic.prefix=mysql-jdbc-
poll.interval.ms=3600000

连接独立属性

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=Nifi-Staging:9092

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000

# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples:
plugin.path=/usr/share/java

跑步时

bin/connect-standalone.sh config/connect-standalone.properties config/mysql-jdbc-connector.properties

结果就是

 (io.confluent.connect.jdbc.source.JdbcSourceTaskConfig:347)
[2020-01-14 14:01:33,289] INFO WorkerSourceTask{id=source-mysql-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:200)
[2020-01-14 14:01:33,415] INFO [Producer clientId=connector-producer-source-mysql-0] Cluster ID: VgW2NunYREqVY5cHNS6snQ (org.apache.kafka.clients.Metadata:266)
[2020-01-14 14:01:43,610] INFO WorkerSourceTask{id=source-mysql-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:398)
[2020-01-14 14:01:43,611] INFO WorkerSourceTask{id=source-mysql-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:415)
[2020-01-14 14:01:44,319] ERROR WorkerSourceTask{id=source-mysql-0} Flush of offsets threw an unexpected exception:  (org.apache.kafka.connect.runtime.WorkerSourceTask:483)
java.util.concurrent.ExecutionException: org.apache.kafka.connect.errors.ConnectException: java.nio.file.AccessDeniedException: /tmp/connect.offsets
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:206)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:472)
        at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:111)
        at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.access$000(SourceTaskOffsetCommitter.java:46)
        at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter$1.run(SourceTaskOffsetCommitter.java:84)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: java.nio.file.AccessDeniedException: /tmp/connect.offsets
        at org.apache.kafka.connect.storage.FileOffsetBackingStore.save(FileOffsetBackingStore.java:101)
        at org.apache.kafka.connect.storage.MemoryOffsetBackingStore$2.call(MemoryOffsetBackingStore.java:105)
        at org.apache.kafka.connect.storage.MemoryOffsetBackingStore$2.call(MemoryOffsetBackingStore.java:99)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        ... 3 more
Caused by: java.nio.file.AccessDeniedException: /tmp/connect.offsets
        at sun.nio.fs.UnixException.translateToIOException(UnixException.java:84)
        at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
        at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
        at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
        at java.nio.file.spi.FileSystemProvider.newOutputStream(FileSystemProvider.java:434)
        at java.nio.file.Files.newOutputStream(Files.java:216)
        at org.apache.kafka.connect.storage.FileOffsetBackingStore.save(FileOffsetBackingStore.java:92)
        ... 6 more
[2020-01-14 14:01:44,326] ERROR WorkerSourceTask{id=source-mysql-0} Failed to commit offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:114)

共有2个答案

谢英光
2023-03-14

快速解决方案:

chown cp-kafka-connect:confluent /tmp/connect.offsets

我认为您需要管理权限。要么通过在您从中提取数据的源目录上授予“cp-kafka-连接”的权限,要么通过更改系统服务“confluent-kafka-connect.service”的UID和GID

相诚
2023-03-14

欢迎来到StackOverflow:)

您在此处看到的错误:

java.nio.file.AccessDeniedException: /tmp/connect.offsets

指示问题-运行Kafka Connect进程的用户无权写入文件/tmp/Connect.offsets。Kafka Connect需要这个文件来存储连接器的进度。您应该使该文件夹可由用户写入,然后重新启动Kafka Connect工作程序。

 类似资料:
  • 我正在使用ConFluent的Kafka s3连接将数据从apache Kafka复制到AWS S3。 问题是,我有AVRO格式的Kafka数据,它没有使用Confluent Schema Registry的AVRO序列化程序,并且我无法更改Kafka生产者。因此,我需要反序列化来自Kafka的现有Avro数据,然后在AWS S3中以拼花格式保存相同的数据。我尝试使用confluent的AvroC

  • 我有一个kafka主题,有200万条消息,我的刷新大小是100000,默认分区为分布式模式,有4个工作者,我可以看到数据在几秒钟内立即写入HDFS(10到15秒)。 我看到创建了一个+tmp目录和文件夹,并且每次触发一个新连接器时都会创建主题。 kafka connect的行为是每次都写得这么快,还是已经将数据存储在HDFS中,并根据连接器属性将其移动到主题目录? 我需要清楚这是怎么发生的。如果我

  • 我想知道有没有办法 > 每个架构而不是每个表创建主题。如果启用了每个模式的主题,那么是否可以在表的基础上支持模式演进(使用模式注册表)? 如果每个模式的主题是不可能的,那么有没有关于如何管理100个或数千个主题的指导方针?考虑到表数与主题数之间会有一对一的映射?

  • 我对Kafka connect很陌生。我想把我的信息从Kafka主题推到弹性搜索。在阅读了可用的文档之后,我从发行版tar下载并编译了弹性搜索接收器。拉链(https://github.com/confluentinc/kafka-connect-elasticsearch/releases) 我添加了弹性搜索属性文件,并将上述jar包含在类路径中。当我在独立模式下运行kafka connect时

  • 我们目前在HDF(Hortonworks Dataflow)3.3.1上,它捆绑了Kafka 2.0.0,并且正在尝试使用分布式模式下的Kafka Connect,以推出一个Google Cloud PubSub接收器连接器。我们正在计划将一些元数据发回到Kafka主题中,并且需要将一个Kafka生产者集成到Sink任务Java代码的flush()函数中。 这是否会对Kafka Connect向K

  • 我无法导出合流连接服务的“type=connector-metrics”指标,但其他指标工作正常。 我正在使用Prometheus导出器java代理来公开来自Confluent connect的指标,如下所示。 export KAFKA_OPTS='-javaagent:/opt/prometheus/jmx_prometheus_javaagent-0.12.0.jar=8093:/opt/普罗