当前位置: 首页 > 知识库问答 >
问题:

Kafka连接器HDFS Sink 5.3.1无法生成所有JSON记录

壤驷鸿祯
2023-03-14

我正在阅读一个已经创建的Kafka主题,在这个主题上,一个单独的集群正在产生一些键和值。我的最终目标是以JSON格式写HDFS,为此我已经用Kafka HDFS Sink 5.3做了一段时间的实验。我面临的问题是,我无法将该主题的所有记录摄取并写入HDFS。到目前为止,如果我的主题包含每小时数百万条记录的数据,我只能写10万条记录。

以下是我用于kafka-connect-standalone.properties和HDFS的配置quickstart-hdfs.properties

value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
schema.enable=false

offset.flush.interval.ms=10000

group.id=x-hdfs-consumer-group
consumer.session.timeout.ms=10000
consumer.heartbeat.interval.ms=3000
consumer.request.timeout.ms=1810000
consumer.max.poll.interval.ms=1800000
name=hdfs-sink-mapr
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=10
topics=topic_name
hdfs.url=maprfs:///hive/x/poc_kafka_connect/
flush.size=20000
errors.tolerance=all 

format.class=io.confluent.connect.hdfs.json.JsonFormat
partitioner.class=io.confluent.connect.hdfs.partitioner.TimeBasedPartitioner
timestamp.extractor=RecordField
timestamp.field=timestamp
partition.duration.ms=3600000
path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH
locale=en
timezone=UTC

如果我不使用错误。公差=所有属性,那么我只制作了约500条记录。

就工人日志而言,我没有收到任何错误,所以我不确定我错过了什么。

由于我对Kafka连接器相对较新,并且已经尝试了一段时间,如果有人能对我做错了什么提供一些见解,我将不胜感激。

Kafka连接器也在2天内死亡。也就是说,它可以正常工作近 2 天,但一段时间后它停止读取数据并且不产生任何内容。我在独立模式下运行它,这可能是原因吗?我试着描述消费群体,似乎所有的消费者都死了。

kafka/kafka_2.12-2.3.0/bin/kafka-consumer-groups.sh --bootstrap-server <server>:9092 --describe --group connect-ajay-hdfs-sink-mapr
GROUP                       TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
connect-ajay-hdfs-sink-mapr topic_name 21         1186755480      1187487551      732071          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 12         957021804       957736810       715006          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 17         957031965       957746941       714976          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 24         957496491       958212413       715922          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 0          956991807       957716202       724395          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 28         956940273       957668689       728416          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 5          957182822       957899308       716486          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 3          956974180       957695189       721009          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 19         956878365       957590196       711831          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 2          956968023       957685835       717812          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 16         957010175       957726139       715964          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 7          956900190       957624746       724556          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 8          957020325       957739604       719279          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 22         957064283       957788487       724204          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 29         957026931       957744496       717565          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 13         957400623       958129555       728932          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 6          956892063       957618485       726422          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 11         957117685       957841645       723960          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 1          957003873       957734649       730776          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 18         957007813       957734011       726198          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 27         957047658       957766131       718473          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 10         956975729       957689182       713453          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 15         957046441       957775251       728810          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 23         957011972       957727996       716024          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 14         957151628       957881644       730016          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 4          957118644       957845399       726755          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 9          957109152       957838497       729345          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 25         956923833       957646070       722237          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 26         957026885       957742112       715227          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 20         957010071       957733605       723534          -               -               -

共有1个答案

微生城
2023-03-14

若要将主题中的所有现有记录获取到接收器连接器,请将其添加到辅助角色属性并重新启动连接

consumer.auto.offset.reset=earliest

如果已启动连接器,则需要重置其使用者组,或更改配置中的名称以创建新组

 类似资料:
  • 我尝试使用最新的kafka (confluent-platform-2.11)连接将Json放到s3上。我在quickstart-s3.properties文件中设置format . class = io . confluent . connect . S3 . format . JSON . JSON format 和负载连接器: 然后我给Kafka发了一行: ~$ Kafka-控制台-生产者

  • 我有一个KStream与用户点击和使用userID作为关键也包含用户详细信息的KTable也使用userID作为关键。KStream和KTable都有一些分区,使用相同的分区策略和相同的键。 当我在这两个之间使用左连接时,大多数点击事件都没有与用户详细信息匹配,有一些匹配。但是当我用GlobalKTable更改KTable时,这些缺失的匹配会消失所有必需的用户点击都富含用户详细信息。 什么会导致这

  • 我有一个现有的2个kafka服务器加载了mysql连接器。它起作用了。此外,我需要添加MongoDB连接器。我已经在我的Kafka服务器(Centos7)上安装了confluent schema registry,它可以工作,我停止/启动/重新启动,看起来没有什么问题。我在这里下载并提取了debezium Mongo插件/usr/连接器/插件/debezium连接器mongodb/ 我编辑了 /e

  • kafka-python(1.0.0)在连接到代理时抛出错误。同时 /usr/bin/kafka-console-producer和 /usr/bin/kafka-console-consumer正常工作。 Python应用程序过去也运行良好,但是在动物园管理员重新启动后,它不再能够连接。 我使用文档中的裸露骨骼示例: 我收到这个错误: 单步通过( /usr/lib/python2.6/site-

  • 在与docker和kafka的基础上磕磕绊绊,无法获得客户端连接 到目前为止我所做的 docker-机器活动,不返回活动主机 我的groovy类(从一个示例中剪切和粘贴,连接如下所示 当我运行这个init时,我得到的错误是它不能解析连接,因为java.io.ioException:不能解析地址:7BF9F9278E64:9092,这是内部容器端口。(我的脚本正在从我的普通IDE桌面环境中调用) 感

  • 用例是将整个消息(JSON)和键作为记录存储在表中,表中有两列“id”和“data”。 数据库是Postgres,它支持JSON的列类型。 根据本文,JSONCon的支持类型是字符串、int64等https://cwiki.apache.org/confluence/display/KAFKA/KIP-301:JsonConver的模式推理 是否可以将数据字段的类型设置为JSON,然后将其存储在P