当前位置: 首页 > 知识库问答 >
问题:

Docker Confluent Kafka HDFS Sink 正在运行,但任务失败

骆英纵
2023-03-14

我正在使用Confluent Kafka一体式docker图像在DigitalOcean水滴上设置Kafka。我能够成功运行Kafka并使用Kafka Connect REST API添加HDFS连接器。我用Cloudera CDH液滴的IP替换了HOST_IP。

 curl -X POST \
  -H "Content-Type: application/json" \
  --data '{
  "name": "hdfs-sink",
  "config": {
    "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
    "tasks.max": "1",
    "topics": "test_hdfs",
    "hdfs.url": "hdfs://HOST_IP:8020",
    "flush.size": "3",
    "name": "hdfs-sink"
  }}' \
  http://HOST_IP:8083/connectors

然后,当我卷曲Kafka Connect以获取hdfs接收器状态时,我在任务下的JSON响应中收到以下错误(服务状态正在运行,但任务失败):

java.lang.RuntimeException: io.confluent.kafka.serializers.subject.TopicNameStrategy 不是 io.confluent.kafka.serializers.subject.SubjectNameStrategy 的实例

更新所以我设法通过使用5.0.0而不是cricket007推荐的beta(愚蠢的我)来克服这个错误。

但是,当我实际尝试将数据发布到我的 HDFS 实例时,我收到不同的错误。我正在使用 ksql-datagen 来生成假数据

docker compose exec ksql datagen ksql datagenquickstart=users format=json topic=test_hdfs maxInterval=1000\propertiesFile=/etc/ksql/datagen。属性引导服务器=代理:9092

{
    "name": "hdfs-sink",
    "connector": {
        "state": "RUNNING",
        "worker_id": "connect:8083"
    },
    "tasks": [{
        "state": "FAILED",
        "trace": "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:510)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.DataException: test_hdfs\n\tat io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:97)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:510)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)\n\t... 13 more\nCaused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1\nCaused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!\n",
        "id": 0,
        "worker_id": "connect:8083"
    }],
    "type": "sink"
}

编辑 2

Avro ksql数据生成器的堆栈跟踪失败

Outputting 1000000 to test_hdfs
Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error serializing row to topic test_hdfs using Converter API
Caused by: org.apache.kafka.connect.errors.DataException: test_hdfs
    at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:77)
    at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:44)
    at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:27)
    at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65)
    at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:854)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:816)
    at io.confluent.ksql.datagen.DataGenProducer.populateTopic(DataGenProducer.java:94)
    at io.confluent.ksql.datagen.DataGen.main(DataGen.java:100)
Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.net.ConnectException: Connection refused (Connection refused)
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:589)
    at java.net.Socket.connect(Socket.java:538)
    at sun.net.NetworkClient.doConnect(NetworkClient.java:180)
    at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
    at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
    at sun.net.www.http.HttpClient.<init>(HttpClient.java:242)
    at sun.net.www.http.HttpClient.New(HttpClient.java:339)
    at sun.net.www.http.HttpClient.New(HttpClient.java:357)
    at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1220)
    at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1156)
    at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1050)
    at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:984)
    at sun.net.www.protocol.http.HttpURLConnection.getOutputStream0(HttpURLConnection.java:1334)
    at sun.net.www.protocol.http.HttpURLConnection.getOutputStream(HttpURLConnection.java:1309)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:172)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:229)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:320)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:312)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:307)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:114)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:153)
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:79)
    at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:116)
    at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:75)
    at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:44)
    at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:27)
    at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65)
    at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:854)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:816)
    at io.confluent.ksql.datagen.DataGenProducer.populateTopic(DataGenProducer.java:94)
    at io.confluent.ksql.datagen.DataGen.main(DataGen.java:100)

编辑 3

好的,所以出于某种原因,即使我使用ksql-datgen生成avro数据,我仍然在Kafka Connect上收到JSON序列化错误。

docker-composeexec ksql-datgen ksql-datgen模式=/impressions.avro格式=avro模式注册URL=超文本传输协议://模式注册表: 8081键=impressionid主题=test_hdfsmaxInterval=1000\属性文件=/etc/ksql/datagen.properties引导服务器=代理: 9092

curl -X POST \
  -H "Content-Type: application/json" \
  --data '{
  "name": "hdfs-sink",
  "config": {
    "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
    "format.class": "io.confluent.connect.hdfs.avro.AvroFormat",
    "tasks.max": "1",
    "schema.compatibility": "FULL",
    "topics": "test_hdfs",
    "hdfs.url": "hdfs://cdh.nuvo.app:8020",
    "flush.size": "3",
    "name": "hdfs-sink"
  }}' \
  http://kafka.nuvo.app:8083/connectors

架构注册表配置

# Bootstrap Kafka servers. If multiple servers are specified, they should be comma-separated.
bootstrap.servers=localhost:9092

# The converters specify the format of data in Kafka and how to translate it into Connect data.
# Every Connect user will need to configure these based on the format they want their data in
# when loaded from or stored into Kafka
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

# The internal converter used for offsets and config data is configurable and must be specified,
# but most users will always want to use the built-in default. Offset and config data is never
# visible outside of Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

Kafka连接日志

  org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:510)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: 
    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:334)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:510)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    ... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'impression_816': was expecting ('true', 'false' or 'null')
 at [Source: (byte[])"impression_816"; line: 1, column: 29]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'impression_816': was expecting ('true', 'false' or 'null')
 at [Source: (byte[])"impression_816"; line: 1, column: 29]

编辑4

[2018-08-22 02:05:51,140] ERROR WorkerSinkTask{id=hdfs-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:510)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: test_hdfs1
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:97)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:510)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    ... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
[2018-08-22 02:05:51,141] ERROR WorkerSinkTask{id=hdfs-sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
[2018-08-22 02:05:51,243] INFO Publish thread interrupted for client_id=consumer-8 client_type=CONSUMER session= cluster=lUWD_PR0RsiTkaunoUrUfA group=connect-hdfs-sink (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor)

共有1个答案

丁志勇
2023-03-14

您设置了< code>ksql-datagen...format=json

但是错误表明您已经在Kafka Connect中设置了Avro转换器

Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

查看您的撰写文件...

  CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
  CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
  CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
  CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'

如果你想生成 Avro 数据,请参考 ksql-datagen 文档。

尽管您目前正在生成JSON,但这并不是您的配置将放在HDFS上的内容
Avro是HDFS Connect的默认输出格式;如果您参考配置文档。

格式.class将数据
写入存储区时要使用的格式类。格式类实现 io.confluent.connect.storage.format.Format 接口。

type:class < br > Default:< code > io . confluent . connect . HDFS . avro . avro format < br >重要性:高

默认情况下,这些类可用:

    < Li > < code > io . confluent . connect . HDFS . avro . avro format < Li > < code > io . confluent . connect . HDFS . JSON . JSON format < Li > < code > io . confluent . connect . HDFS . parquet . parquet format < Li > < code > io . confluent . connect . HDFS . string . string format

如果您不使用JsonFormat,我相信为了从JSON输出Avro,您需要一个看起来像这样的JSON记录

{
  "schema": {...}
  "payload": {...}
}

否则,无法从 JSON 记录推断 Avro 架构。

通过你的一系列编辑,我认为你转而生产Avro,但使用JsonConverter基于我上面提到的,这不是我的建议。基本上,转换器类类型必须匹配生产者数据,并定义消费者反序列化器

对于id -1的序列化错误,基本上是说键或值中的数据不是Avro。现在,KSQL不太适合Avro密钥,所以我敢打赌是密钥反序列化失败了。要解决这个问题,请设置

key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
 类似资料:
  • 我正在尝试创建计划任务以运行批处理文件。我知道我的批处理文件运行良好,因为手动运行它没有问题。但是,当任务调用它时,它会说它正在运行,但它不是。我知道它没有运行的原因是它调用了一个python脚本,python脚本发送了一封电子邮件,告诉我进程已经开始了。我没有收到那封电子邮件。python进程不会花费太长的时间(最多5分钟),而且任务一直说它在一小时后“运行”。 我有当前设置与运行用户是否登录或

  • 问题内容: 在我当前的项目中,我正在使用junit测试。在本地PC上运行我的ant文件会生成预期的测试报告,但是当Bamboo尝试运行我的测试时,它将生成以下输出。 我怎么了 SimplerTest.java 本地输出: 服务器输出: build.xml ant -v输出: http://nopaste.info/1abdd27a8e.html 问题答案: 感谢详细的Ant输出。 看来您正在Bam

  • 我面临这个运行时错误。它说: 错误:任务“:app:dexDebug”的执行失败。 com.android.ide.common.process.ProcessExcture:org.gradle.process.internal.ExecExc0019:进程'命令'/usr/lib/jvm/java-8-oracle/bin/java"完成非零退出值2 这是我的身材。渐变脚本: 请帮帮我。谢谢!

  • 我有以下ant目标: 当我尝试从命令行运行它时,我得到以下输出: 测试在Eclipse Mars(4.5.2)中成功运行。有人知道为什么会发生这种情况,以及我如何让JUnit测试通过ant运行吗?我正在使用JUnit 4.11,并尝试了ant v1.9.3和v1.9.6(以上输出是从ant v1.9.6生成的)。 编辑 根据注释中的建议,我还使用Antv1.8.4进行了测试,但这对导致的崩溃没有任

  • 问题内容: 我在主软件包的一个目录下有一些文件: main.go config.go server.go 当我这样做时:“执行构建”程序将完美构建并运行良好。当我这样做时:“ go run main.go”失败了。 输出: 未定义的符号是结构,并且大写,因此应将其导出。 我的Go版本:go1.1.2 linux / amd64 问题答案: 这应该工作 Go run需要一个文件或多个文件,并且它仅合

  • 我正在尝试创建一个应用程序,用于查询cat图像的站点,如果JSON ID是唯一的,则将其保存到android设备,然后从设备以幻灯片格式显示它们。尽管如此,我的AsyncTask似乎并没有实际执行。调试器确认已建立网络连接,并且不会向我反馈任何错误,所以我不知道代码出了什么问题。希望有人能帮忙!代码如下: