我正在阅读一个已经创建的Kafka主题,在这个主题上,一个单独的集群正在产生一些键和值。我的最终目标是以JSON格式写HDFS,为此我已经用Kafka HDFS Sink 5.3做了一段时间的实验。我面临的问题是,我无法将该主题的所有记录摄取并写入HDFS。到目前为止,如果我的主题包含每小时数百万条记录的数据,我只能写10万条记录。
以下是我用于kafka-connect-standalone.properties和HDFS的配置quickstart-hdfs.properties
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
schema.enable=false
offset.flush.interval.ms=10000
group.id=x-hdfs-consumer-group
consumer.session.timeout.ms=10000
consumer.heartbeat.interval.ms=3000
consumer.request.timeout.ms=1810000
consumer.max.poll.interval.ms=1800000
name=hdfs-sink-mapr
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=10
topics=topic_name
hdfs.url=maprfs:///hive/x/poc_kafka_connect/
flush.size=20000
errors.tolerance=all
format.class=io.confluent.connect.hdfs.json.JsonFormat
partitioner.class=io.confluent.connect.hdfs.partitioner.TimeBasedPartitioner
timestamp.extractor=RecordField
timestamp.field=timestamp
partition.duration.ms=3600000
path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH
locale=en
timezone=UTC
如果我不使用错误。公差=所有属性,那么我只制作了约500条记录。
就工人日志而言,我没有收到任何错误,所以我不确定我错过了什么。
由于我对Kafka连接器相对较新,并且已经尝试了一段时间,如果有人能对我做错了什么提供一些见解,我将不胜感激。
Kafka连接器也在2天内死亡。也就是说,它可以正常工作近 2 天,但一段时间后它停止读取数据并且不产生任何内容。我在独立模式下运行它,这可能是原因吗?我试着描述消费群体,似乎所有的消费者都死了。
kafka/kafka_2.12-2.3.0/bin/kafka-consumer-groups.sh --bootstrap-server <server>:9092 --describe --group connect-ajay-hdfs-sink-mapr
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
connect-ajay-hdfs-sink-mapr topic_name 21 1186755480 1187487551 732071 - - -
connect-ajay-hdfs-sink-mapr topic_name 12 957021804 957736810 715006 - - -
connect-ajay-hdfs-sink-mapr topic_name 17 957031965 957746941 714976 - - -
connect-ajay-hdfs-sink-mapr topic_name 24 957496491 958212413 715922 - - -
connect-ajay-hdfs-sink-mapr topic_name 0 956991807 957716202 724395 - - -
connect-ajay-hdfs-sink-mapr topic_name 28 956940273 957668689 728416 - - -
connect-ajay-hdfs-sink-mapr topic_name 5 957182822 957899308 716486 - - -
connect-ajay-hdfs-sink-mapr topic_name 3 956974180 957695189 721009 - - -
connect-ajay-hdfs-sink-mapr topic_name 19 956878365 957590196 711831 - - -
connect-ajay-hdfs-sink-mapr topic_name 2 956968023 957685835 717812 - - -
connect-ajay-hdfs-sink-mapr topic_name 16 957010175 957726139 715964 - - -
connect-ajay-hdfs-sink-mapr topic_name 7 956900190 957624746 724556 - - -
connect-ajay-hdfs-sink-mapr topic_name 8 957020325 957739604 719279 - - -
connect-ajay-hdfs-sink-mapr topic_name 22 957064283 957788487 724204 - - -
connect-ajay-hdfs-sink-mapr topic_name 29 957026931 957744496 717565 - - -
connect-ajay-hdfs-sink-mapr topic_name 13 957400623 958129555 728932 - - -
connect-ajay-hdfs-sink-mapr topic_name 6 956892063 957618485 726422 - - -
connect-ajay-hdfs-sink-mapr topic_name 11 957117685 957841645 723960 - - -
connect-ajay-hdfs-sink-mapr topic_name 1 957003873 957734649 730776 - - -
connect-ajay-hdfs-sink-mapr topic_name 18 957007813 957734011 726198 - - -
connect-ajay-hdfs-sink-mapr topic_name 27 957047658 957766131 718473 - - -
connect-ajay-hdfs-sink-mapr topic_name 10 956975729 957689182 713453 - - -
connect-ajay-hdfs-sink-mapr topic_name 15 957046441 957775251 728810 - - -
connect-ajay-hdfs-sink-mapr topic_name 23 957011972 957727996 716024 - - -
connect-ajay-hdfs-sink-mapr topic_name 14 957151628 957881644 730016 - - -
connect-ajay-hdfs-sink-mapr topic_name 4 957118644 957845399 726755 - - -
connect-ajay-hdfs-sink-mapr topic_name 9 957109152 957838497 729345 - - -
connect-ajay-hdfs-sink-mapr topic_name 25 956923833 957646070 722237 - - -
connect-ajay-hdfs-sink-mapr topic_name 26 957026885 957742112 715227 - - -
connect-ajay-hdfs-sink-mapr topic_name 20 957010071 957733605 723534 - - -
若要将主题中的所有现有记录获取到接收器连接器,请将其添加到辅助角色属性并重新启动连接
consumer.auto.offset.reset=earliest
如果已启动连接器,则需要重置其使用者组,或更改配置中的名称以创建新组
我尝试使用最新的kafka (confluent-platform-2.11)连接将Json放到s3上。我在quickstart-s3.properties文件中设置format . class = io . confluent . connect . S3 . format . JSON . JSON format 和负载连接器: 然后我给Kafka发了一行: ~$ Kafka-控制台-生产者
我有一个KStream与用户点击和使用userID作为关键也包含用户详细信息的KTable也使用userID作为关键。KStream和KTable都有一些分区,使用相同的分区策略和相同的键。 当我在这两个之间使用左连接时,大多数点击事件都没有与用户详细信息匹配,有一些匹配。但是当我用GlobalKTable更改KTable时,这些缺失的匹配会消失所有必需的用户点击都富含用户详细信息。 什么会导致这
我有一个现有的2个kafka服务器加载了mysql连接器。它起作用了。此外,我需要添加MongoDB连接器。我已经在我的Kafka服务器(Centos7)上安装了confluent schema registry,它可以工作,我停止/启动/重新启动,看起来没有什么问题。我在这里下载并提取了debezium Mongo插件/usr/连接器/插件/debezium连接器mongodb/ 我编辑了 /e
kafka-python(1.0.0)在连接到代理时抛出错误。同时 /usr/bin/kafka-console-producer和 /usr/bin/kafka-console-consumer正常工作。 Python应用程序过去也运行良好,但是在动物园管理员重新启动后,它不再能够连接。 我使用文档中的裸露骨骼示例: 我收到这个错误: 单步通过( /usr/lib/python2.6/site-
在与docker和kafka的基础上磕磕绊绊,无法获得客户端连接 到目前为止我所做的 docker-机器活动,不返回活动主机 我的groovy类(从一个示例中剪切和粘贴,连接如下所示 当我运行这个init时,我得到的错误是它不能解析连接,因为java.io.ioException:不能解析地址:7BF9F9278E64:9092,这是内部容器端口。(我的脚本正在从我的普通IDE桌面环境中调用) 感
用例是将整个消息(JSON)和键作为记录存储在表中,表中有两列“id”和“data”。 数据库是Postgres,它支持JSON的列类型。 根据本文,JSONCon的支持类型是字符串、int64等https://cwiki.apache.org/confluence/display/KAFKA/KIP-301:JsonConver的模式推理 是否可以将数据字段的类型设置为JSON,然后将其存储在P