当前位置: 首页 > 知识库问答 >
问题:

Kafka s3 json连接器

甄鹏云
2023-03-14

我尝试使用最新的kafka (confluent-platform-2.11)连接将Json放到s3上。我在quickstart-s3.properties文件中设置format . class = io . confluent . connect . S3 . format . JSON . JSON format

和负载连接器:

~$ confluent load s3-sink  {   "name": "s3-sink",   "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "1",
    "topics": "s3_hose",
    "s3.region": "us-east-1",
    "s3.bucket.name": "some-bucket-name",
    "s3.part.size": "5242880",
    "flush.size": "1",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
    "schema.compatibility": "NONE",
    "name": "s3-sink"   },   "tasks": [
    {
      "connector": "s3-sink",
      "task": 0
    }   ],   "type": null }

然后我给Kafka发了一行:

~$ Kafka-控制台-生产者 --经纪人列表 本地主机:9092 --主题 s3_hose

{“q”: 1}

我在连接器日志中看到Avro转换异常

[2018-01-14 14:41:30,832] ERROR WorkerSinkTask{id=s3-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runti me.WorkerTask:172) org.apache.kafka.connect.errors.DataException: s3_hose
        at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:96)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:454)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:287)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1 Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

为什么有尝试使用一些Avro转换器,如果我设置format.class=io.confluent.connect.s3.format.json.JsonFormata?

共有1个答案

安毅
2023-03-14

该消息指的是转换器。这与最终的输出格式不同。它用于将Kafka中的数据转换为connect data API格式,这样连接器就有了标准的工作内容。要设置转换器,您可以

1)将key.converter和value.converter设置为worker属性文件中的内置JsonConverter,以便它成为worker中运行的所有连接器的默认设置

2) 设置密钥。转换器和值。连接器级别的转换器属性,以覆盖在工作级设置的属性

请注意,由于这是一个接收器连接器,因此您非常希望将转换器与主题中的数据类型相匹配,以便可以正确转换。

 类似资料:
  • Presto与Cassandra/ScylLadb的默认连接数是多少?如何设置此属性?谢谢

  • **dataframe2:从另一个来源获得的键的Dataframe(这些键是上表中ID列的分区键)-此表中不同键的数量约为0.15万** 现在,此代码总是导致“com.datastax.oss.driver.api.core.servererrors.ReadFailureException:在一致性LOCAL_ONE读取查询期间Cassandra失败(需要1个响应,但只有0个副本响应,1个失败)

  • 代码片段如下所示: 如果有人有决议,请帮忙?

  • kafka jdbc接收器连接器是否支持将其使用的内容写入不同的主题。我正在寻找一种传递机制,如下图所示。如果没有,我可以链接一个接收器和源(从接收器写的地方读取),但我认为这不会有那么好的性能。也许我可以修改现有的接收器连接器来实现这一点?

  • 本文向大家介绍HTTP长连接、短连接?相关面试题,主要包含被问及HTTP长连接、短连接?时的应答技巧和注意事项,需要的朋友参考一下 在HTTP/1.0中默认使用短连接。也就是说,客户端和服务器每进行一次HTTP操作,就建立一次连接,任务结束就中断连接。当客户端浏览器访问的某个HTML或其他类型的Web页中包含有其他的Web资源(如JavaScript文件、图像文件、CSS文件等),每遇到这样一个W

  • 问题内容: 我在两个表之间有多对多关系。 表包含我的餐厅。 表包含不同的类别。 表包含两列,每列分别包含两个表的ID。 以下陈述是我能想到的,但没有给我我想要的输出。 我希望输出是有关餐厅的信息,并在最后一列中是类别的连接行。 问题答案: 要串联值,可以使用。xml路径解决方案有误,应使用和特殊字符。 您也可以使用变量解决方案

  • 有人能告诉我在maven中scm连接和developerConnection之间的区别吗? 我正在尝试使用,它需要其中之一。 [错误]未能执行goal org . Apache . maven . plugins:maven-release-plugin:2 . 3 . 2:在项目was-topology-legacy-dsl上准备(default-cli ):缺少必需的设置:必须指定scm连接或

  • 在Apache Flink流处理中,连接操作与连接有何不同,因此CoProcessFunction和ProcessJoinFunction有何不同,这是CoProcessFunction提供的onTimer函数吗?您能否提供一个适用于以相互排斥的方式连接/连接的示例用例。