当前位置: 首页 > 知识库问答 >
问题:

kafka-connect :在连接器接收器卡桑德拉的分布式配置中出错

施权
2023-03-14

我收到连接器接收器cassandra的分布式配置的任务错误。我正在运行命令:

curl -s localhost:8083/connectors/cassandraSinkConnector2/status |JQ

为了获得地位

{
  "name": "cassandraSinkConnector2",
  "connector": {
    "state": "RUNNING",
    "worker_id": localhost:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "FAILED",
      "worker_id": "localhost:8083",
      "trace": "org.apache.kafka.common.KafkaException: Failed to construct kafka consumer\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:811)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:624)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:605)\n\tat org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:505)\n\tat org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:441)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:865)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:110)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:880)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:876)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.common.KafkaException: io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor ClassNotFoundException exception occurred\n\tat org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:357)\n\tat org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:332)\n\tat org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:319)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:701)\n\t... 12 more\nCaused by: java.lang.ClassNotFoundException: io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor\n\tat java.net.URLClassLoader.findClass(URLClassLoader.java:382)\n\tat java.lang.ClassLoader.loadClass(ClassLoader.java:424)\n\tat org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)\n\tat java.lang.ClassLoader.loadClass(ClassLoader.java:357)\n\tat java.lang.Class.forName0(Native Method)\n\tat java.lang.Class.forName(Class.java:348)\n\tat org.apache.kafka.common.utils.Utils.loadClass(Utils.java:338)\n\tat org.apache.kafka.common.utils.Utils.newInstance(Utils.java:327)\n\tat org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:355)\n\t... 15 more\n"
    }
  ],
  "type": "sink"

堆栈跟踪:

"trace": "org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:811)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:624)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:605)
    at org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:505)
    at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:441)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:865)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:110)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:880)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:876)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor ClassNotFoundException exception occurred
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:357)
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:332)
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:319)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:701)
    ... 12 more
Caused by: java.lang.ClassNotFoundException: io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at org.apache.kafka.common.utils.Utils.loadClass(Utils.java:338)
    at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:327)
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:355)
    ... 15 more

您可以在下面找到连接器的配置。


{
  "name": "cassandraSinkConnector2",
  "config": {
    "connector.class": "io.confluent.connect.cassandra.CassandraSinkConnector",
    "tasks.max": "1",
    "topics": "appartenance_de",
    "cassandra.contact.points": "localhost",
    "cassandra.kcql": "INSERT INTO app_test SELECT * FROM app_de",
    "cassandra.port": "9042",
    "cassandra.keyspace": "dev_dkks",
    "cassandra.username": "superuser",
    "cassandra.password": "password",
    "cassandra.write.mode": "insert",
    "value.converter.schemas.enable": "true",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://localhost:8081",
    "name": "cassandraSinkConnector2"
  },
  "tasks": [
    {
      "connector": "cassandraSinkConnector2",
      "task": 0
    }
  ],
  "type": "sink"
}

新错误:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Record with a null key was encountered.  This connector requires that records from Kafka contain the keys for the Cassandra table. Please use a transformation like org.apache.kafka.connect.transforms.ValueToKey to create a key with the proper fields.
    at io.confluent.connect.cassandra.CassandraSinkTask.put(CassandraSinkTask.java:86)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
    ... 10 more
"

共有1个答案

巫马刚洁
2023-03-14

根本错误是

java.lang.ClassNotFoundException: io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor

监控拦截器是汇合平台的一部分。您可以在Kafka Connect worker配置中禁用它们,或者更好地确保< code >/usr/share/Java/monitoring-interceptors/monitoring-interceptors-5 . 2 . 1 . JAR JAR对Kafka Connect worker可用。

您看到的新错误是

org.apache.kafka.connect.errors.DataException: 
Record with a null key was encountered.  This connector requires that records from Kafka contain the keys for the Cassandra table. 
Please use a transformation like org.apache.kafka.connect.transforms.ValueToKey to create a key with the proper fields.

我建议按照错误中的建议使用单消息转换来正确键入数据。您可以在此处查看执行此操作的示例,并在此处查看转换的文档。

 类似资料:
  • 我使用的是datastax提供的spark-cassandra-connector 1.1.0。我注意到了interining问题,我不知道为什么会发生这样的事情:当我广播cassandra connector并试图在执行程序上使用它时,我重复了异常,这表明我的配置无效,无法在0.0.0连接到cassandra。 示例StackTrace:

  • 我有这个代码: 我得到以下异常: 所有主机尝试查询失败(已尝试:/127.0.0.1:9042(com.datastax.driver.core.TransportException:[/127.0.0.1:9042]无法连接)),堆栈跟踪:com.datastax.driver.core.exceptions.NoHostAvailableException:所有主机尝试查询失败(已尝试:/12

  • 我正在尝试将来自主题的数据(json数据)写入MySql数据库。我想我需要一个JDBC接收器连接器。 我如何配置连接器以将主题中的json数据映射到如何将数据插入数据库。 我能找到的文件只有这个。 “接收器连接器需要了解架构,因此您应该使用合适的转换器,例如架构注册表附带的Avro转换器,或启用了架构的JSON转换器。如果存在Kafka记录键,则可以是基元类型或连接结构,记录值必须是连接结构。从连

  • 我想重置AerospikeSink Kafka Connector偏移量,我首先删除连接器消费组()偏移量,然后重新创建它。当我使用策略重新创建时,它以正确的偏移量重新创建,但是然后,当任务状态从更改为任务时,它会从连接器的前一个实例到达的点继续处理,这会阻止从一开始就读取来自kafka的所有消息(我正在尝试再次读取来自kafka的所有消息)。 注意:使用新名称创建新连接器并不能解决问题。 使用任

  • 我们有制作人将以下内容发送给Kafka: 主题=系统日志,每天 ~25,000 个事件 topic=nginx,每天 ~5,000 个事件 topic=zeek.xxx.log,每天~100,000个事件(总计)。在最后一种情况下,有 20 个不同的 zeek 主题,例如 zeek.conn.log 和 zeek.http.log 实例充当消费者,将数据从kafka发送到elasticsearch

  • 我一直在测试kafka连接。但是对于每个连接器,我都必须去阅读连接器留档以了解连接器所需的配置。就我阅读kafka连接API留档而言,我已经看到API以获取连接器相关数据。 -返回Kafka Connect集群中安装的连接器插件列表。请注意,API仅检查处理请求的工作人员上的连接器,这意味着您可能会看到不一致的结果,尤其是在滚动升级期间,如果您添加了新的连接器罐。 根据配置定义验证提供的配置值。此