当前位置: 首页 > 知识库问答 >
问题:

如何配置Kafka Connect Worker以将大量消息流式传输到HDF

应俭
2023-03-14

我当前的工作设置:

NiFi将Avro消息(ConFluent Schema注册表参考)流式传输到Kafka(v2.0.0,20个分区,ConFluent v5.0.0),Kafka Connect Worker(HDFS接收器)将这些消息以Parket格式流式传输到HDFS,flush.size=70000

我的问题:

这个配置很好,但当我将配置更改为flush时。大小=1000000(因为70k条消息的最大大小为5-7MB,但拼花文件块大小为256MB)connect worker返回发送提取请求时出错:

...
[2019-05-24 14:00:21,784] INFO [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] Error sending fetch request (sessionId=1661483807, epoch=374) to node 3: java.io.IOException: Connection to 3 was disconnected before the response was read. (org.apache.kafka.clients.FetchSessionHandler)
[2019-05-24 14:00:21,784] WARN [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={mytopic-10=(offset=27647797, logStartOffset=24913298, maxBytes=1048576), mytopic-16=(offset=27647472, logStartOffset=24913295, maxBytes=1048576), mytopic-7=(offset=27647429, logStartOffset=24913298, maxBytes=1048576), mytopic-4=(offset=27646967, logStartOffset=24913296, maxBytes=1048576), mytopic-13=(offset=27646404, logStartOffset=24913298, maxBytes=1048576), mytopic-19=(offset=27648276, logStartOffset=24913300, maxBytes=1048576), mytopic-1=(offset=27647036, logStartOffset=24913307, maxBytes=1048576)}, isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=1661483807, epoch=374)) (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 3 was disconnected before the response was read
...

我的配置:

HDFS连接器配置:

name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat
tasks.max=1
topics=mytopic
hdfs.url=hdfs://hdfsnode:8020/user/someuser/kafka_hdfs_sink/
flush.size=1000000

Kafka Connect Worker配置:

bootstrap.servers=confleuntnode1:9092,confleuntnode2:9092,confleuntnode3:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://confleuntnode:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
plugin.path=/opt/confluent/current/share/java/

我的问题:

如何使用Kafka Connect Worker将更大尺寸的消息从Kafka流到HDFS?

共有1个答案

靳越
2023-03-14

我通过在分布式模式(而不是独立模式)下运行connect解决了这个问题。现在,我可以向HDFS写入多达350万条记录(约256 mb)。但这也带来了一个新问题:1)处理速度非常慢(1小时内有3500万条记录);2) 无法写入大于256 Mb的拼花文件。我将发布一个新的SO问题。

 类似资料:
  • 问题内容: 看起来Spring 不能将响应直接流式传输到文件而不将其全部缓存在内存中。使用较新的Spring 5实现此目标的合适方法是什么? 我看到人们已经找到了解决此问题的一些变通方法/技巧,但是我对使用正确的方法更感兴趣。 有许多用于下载二进制数据的示例,但几乎所有示例都将其加载到内存中。 问题答案: 使用最近稳定的Spring WebFlux(截至撰写时为5.2.4.RELEASE): 对我

  • Spring似乎无法将响应直接流式传输到文件,而不将其全部缓冲在内存中。使用较新的Spring 5实现这一点的正确方法是什么? 我看到人们在中找到了一些解决此问题的方法,但我更感兴趣的是使用以正确的方式解决此问题。 有许多使用下载二进制数据的示例,但几乎所有示例都将加载到内存中。

  • 问题内容: 使用MVC模型,我想编写一个JsonResult,它将Json字符串流式传输到客户端,而不是一次将所有数据转换成Json字符串,然后将其流回客户端。我有一些动作需要在Json传输时发送非常大的记录(超过300,000条记录),我认为基本的JsonResult实现是不可伸缩的。 我正在使用Json.net,我想知道是否有一种方法可以在转换Json字符串时流化它的块。 但是我不确定如何将这

  • 我目前正在尝试轻松地将消息从一个Kafka集群上的主题流式传输到另一个集群(远程)- 所以假设WordCount演示在另一台PC上的一个Kafka-Instance上,而不是我自己的PC上。我也有一个Kafka-Instance在我的本地机器上运行。 现在我想让WordCount演示在包含应该计算单词的句子的Topic(“远程”)上运行。 然而,计数应该写入我本地系统上的Topic而不是“远程”T

  • 问题 如何流传输大文件? 方案 要流传输大文件,需要添加传输译码(Transfer-Encoding)区块头,这样才能一边下载一边显示。否则,浏览器将缓冲所有数据直到下载完毕才显示。 如果这样写:直接修改基础字符串(例中就是j),然后用 yield 返回--是没有效果的。如果要使用 yield,就要向对所有内容使用 yield。因为这个函式此时是一个生成器。(注:具体细节请查看 yield 文档,

  • 问题内容: 我正在尝试设置一个非常基本的html5页面,该页面会加载20MB的.mp4视频。看来,浏览器需要下载整个内容,而不仅仅是播放视频的第一部分并在其余部分进行流传输。 我在搜索时发现的最接近的内容…我尝试了“手刹”和“数据回合”,两者均未发挥作用: 关于如何执行此操作或是否可行的任何想法? 这是我正在使用的代码: 问题答案: 确保moov(元数据)在mdat(音频/视频数据)之前。这也称为