我正在使用Confluent Kafka一体式docker图像在DigitalOcean水滴上设置Kafka。我能够成功运行Kafka并使用Kafka Connect REST API添加HDFS连接器。我用Cloudera CDH液滴的IP替换了HOST_IP。
curl -X POST \
-H "Content-Type: application/json" \
--data '{
"name": "hdfs-sink",
"config": {
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"tasks.max": "1",
"topics": "test_hdfs",
"hdfs.url": "hdfs://HOST_IP:8020",
"flush.size": "3",
"name": "hdfs-sink"
}}' \
http://HOST_IP:8083/connectors
然后,当我卷曲Kafka Connect以获取hdfs接收器状态时,我在任务下的JSON响应中收到以下错误(服务状态正在运行,但任务失败):
java.lang.RuntimeException: io.confluent.kafka.serializers.subject.TopicNameStrategy 不是 io.confluent.kafka.serializers.subject.SubjectNameStrategy 的实例
更新所以我设法通过使用5.0.0而不是cricket007推荐的beta(愚蠢的我)来克服这个错误。
但是,当我实际尝试将数据发布到我的 HDFS 实例时,我收到不同的错误。我正在使用 ksql-datagen 来生成假数据
docker compose exec ksql datagen ksql datagenquickstart=users format=json topic=test_hdfs maxInterval=1000\propertiesFile=/etc/ksql/datagen。属性引导服务器=代理:9092
{
"name": "hdfs-sink",
"connector": {
"state": "RUNNING",
"worker_id": "connect:8083"
},
"tasks": [{
"state": "FAILED",
"trace": "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:510)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.DataException: test_hdfs\n\tat io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:97)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:510)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)\n\t... 13 more\nCaused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1\nCaused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!\n",
"id": 0,
"worker_id": "connect:8083"
}],
"type": "sink"
}
编辑 2
Avro ksql数据生成器的堆栈跟踪失败
Outputting 1000000 to test_hdfs
Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error serializing row to topic test_hdfs using Converter API
Caused by: org.apache.kafka.connect.errors.DataException: test_hdfs
at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:77)
at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:44)
at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:27)
at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65)
at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:854)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:816)
at io.confluent.ksql.datagen.DataGenProducer.populateTopic(DataGenProducer.java:94)
at io.confluent.ksql.datagen.DataGen.main(DataGen.java:100)
Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.net.ConnectException: Connection refused (Connection refused)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at java.net.Socket.connect(Socket.java:538)
at sun.net.NetworkClient.doConnect(NetworkClient.java:180)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
at sun.net.www.http.HttpClient.<init>(HttpClient.java:242)
at sun.net.www.http.HttpClient.New(HttpClient.java:339)
at sun.net.www.http.HttpClient.New(HttpClient.java:357)
at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1220)
at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1156)
at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1050)
at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:984)
at sun.net.www.protocol.http.HttpURLConnection.getOutputStream0(HttpURLConnection.java:1334)
at sun.net.www.protocol.http.HttpURLConnection.getOutputStream(HttpURLConnection.java:1309)
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:172)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:229)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:320)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:312)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:307)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:114)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:153)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:79)
at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:116)
at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:75)
at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:44)
at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:27)
at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65)
at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:854)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:816)
at io.confluent.ksql.datagen.DataGenProducer.populateTopic(DataGenProducer.java:94)
at io.confluent.ksql.datagen.DataGen.main(DataGen.java:100)
编辑 3
好的,所以出于某种原因,即使我使用ksql-datgen生成avro数据,我仍然在Kafka Connect上收到JSON序列化错误。
docker-composeexec ksql-datgen ksql-datgen模式=/impressions.avro格式=avro模式注册URL=超文本传输协议://模式注册表: 8081键=impressionid主题=test_hdfsmaxInterval=1000\属性文件=/etc/ksql/datagen.properties引导服务器=代理: 9092
curl -X POST \
-H "Content-Type: application/json" \
--data '{
"name": "hdfs-sink",
"config": {
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"format.class": "io.confluent.connect.hdfs.avro.AvroFormat",
"tasks.max": "1",
"schema.compatibility": "FULL",
"topics": "test_hdfs",
"hdfs.url": "hdfs://cdh.nuvo.app:8020",
"flush.size": "3",
"name": "hdfs-sink"
}}' \
http://kafka.nuvo.app:8083/connectors
架构注册表配置
# Bootstrap Kafka servers. If multiple servers are specified, they should be comma-separated.
bootstrap.servers=localhost:9092
# The converters specify the format of data in Kafka and how to translate it into Connect data.
# Every Connect user will need to configure these based on the format they want their data in
# when loaded from or stored into Kafka
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
# The internal converter used for offsets and config data is configurable and must be specified,
# but most users will always want to use the built-in default. Offset and config data is never
# visible outside of Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
Kafka连接日志:
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:510)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:334)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:510)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'impression_816': was expecting ('true', 'false' or 'null')
at [Source: (byte[])"impression_816"; line: 1, column: 29]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'impression_816': was expecting ('true', 'false' or 'null')
at [Source: (byte[])"impression_816"; line: 1, column: 29]
编辑4
[2018-08-22 02:05:51,140] ERROR WorkerSinkTask{id=hdfs-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:510)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: test_hdfs1
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:97)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:510)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
[2018-08-22 02:05:51,141] ERROR WorkerSinkTask{id=hdfs-sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
[2018-08-22 02:05:51,243] INFO Publish thread interrupted for client_id=consumer-8 client_type=CONSUMER session= cluster=lUWD_PR0RsiTkaunoUrUfA group=connect-hdfs-sink (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor)
您设置了< code>ksql-datagen...format=json
但是错误表明您已经在Kafka Connect中设置了Avro转换器
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
查看您的撰写文件...
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
如果你想生成 Avro 数据,请参考 ksql-datagen
文档。
尽管您目前正在生成JSON,但这并不是您的配置将放在HDFS上的内容
Avro是HDFS Connect的默认输出格式;如果您参考配置文档。
格式.class
将数据
写入存储区时要使用的格式类。格式类实现 io.confluent.connect.storage.format.Format
接口。
type:class < br > Default:< code > io . confluent . connect . HDFS . avro . avro format < br >重要性:高
默认情况下,这些类可用:
如果您不使用JsonFormat,我相信为了从JSON输出Avro,您需要一个看起来像这样的JSON记录
{
"schema": {...}
"payload": {...}
}
否则,无法从 JSON 记录推断 Avro 架构。
通过你的一系列编辑,我认为你转而生产Avro,但使用JsonConverter基于我上面提到的,这不是我的建议。基本上,转换器类类型必须匹配生产者数据,并定义消费者反序列化器
对于id -1的序列化错误,基本上是说键或值中的数据不是Avro。现在,KSQL不太适合Avro密钥,所以我敢打赌是密钥反序列化失败了。要解决这个问题,请设置
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
我正在尝试创建计划任务以运行批处理文件。我知道我的批处理文件运行良好,因为手动运行它没有问题。但是,当任务调用它时,它会说它正在运行,但它不是。我知道它没有运行的原因是它调用了一个python脚本,python脚本发送了一封电子邮件,告诉我进程已经开始了。我没有收到那封电子邮件。python进程不会花费太长的时间(最多5分钟),而且任务一直说它在一小时后“运行”。 我有当前设置与运行用户是否登录或
问题内容: 在我当前的项目中,我正在使用junit测试。在本地PC上运行我的ant文件会生成预期的测试报告,但是当Bamboo尝试运行我的测试时,它将生成以下输出。 我怎么了 SimplerTest.java 本地输出: 服务器输出: build.xml ant -v输出: http://nopaste.info/1abdd27a8e.html 问题答案: 感谢详细的Ant输出。 看来您正在Bam
我面临这个运行时错误。它说: 错误:任务“:app:dexDebug”的执行失败。 com.android.ide.common.process.ProcessExcture:org.gradle.process.internal.ExecExc0019:进程'命令'/usr/lib/jvm/java-8-oracle/bin/java"完成非零退出值2 这是我的身材。渐变脚本: 请帮帮我。谢谢!
我有以下ant目标: 当我尝试从命令行运行它时,我得到以下输出: 测试在Eclipse Mars(4.5.2)中成功运行。有人知道为什么会发生这种情况,以及我如何让JUnit测试通过ant运行吗?我正在使用JUnit 4.11,并尝试了ant v1.9.3和v1.9.6(以上输出是从ant v1.9.6生成的)。 编辑 根据注释中的建议,我还使用Antv1.8.4进行了测试,但这对导致的崩溃没有任
问题内容: 我在主软件包的一个目录下有一些文件: main.go config.go server.go 当我这样做时:“执行构建”程序将完美构建并运行良好。当我这样做时:“ go run main.go”失败了。 输出: 未定义的符号是结构,并且大写,因此应将其导出。 我的Go版本:go1.1.2 linux / amd64 问题答案: 这应该工作 Go run需要一个文件或多个文件,并且它仅合
我正在尝试创建一个应用程序,用于查询cat图像的站点,如果JSON ID是唯一的,则将其保存到android设备,然后从设备以幻灯片格式显示它们。尽管如此,我的AsyncTask似乎并没有实际执行。调试器确认已建立网络连接,并且不会向我反馈任何错误,所以我不知道代码出了什么问题。希望有人能帮忙!代码如下: