当前位置: 首页 > 知识库问答 >
问题:

组织。阿帕奇。Kafka。常见的错误。水槽Kafka水槽中的RecordToolArge异常

章安宜
2023-03-14

我试图从JMS源读取数据,并将它们推送到KAFKA主题中,几个小时后,我观察到推送到KAFKA主题的频率几乎为零,经过一些初步分析,我在FLUME日志中发现以下异常。

28 Feb 2017 16:35:44,758 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:158)  - Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to publish events
        at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:252)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1399305 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
        at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:686)
        at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:449)
        at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:212)
        ... 3 more
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1399305 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.

my flume显示max.request的当前设置值(在日志中)。尺寸为1048576,明显小于1399305,增加了此最大要求。大小可能会消除这些异常,但我无法找到更新该值的正确位置。

我的水槽。配置,

a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.channels.c1.type = file
a1.channels.c1.transactionCapacity = 1000
a1.channels.c1.capacity = 100000000
a1.channels.c1.checkpointDir = /data/flume/apache-flume-1.7.0-bin/checkpoint
a1.channels.c1.dataDirs = /data/flume/apache-flume-1.7.0-bin/data

a1.sources.r1.type = jms

a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i1.preserveExisting = true

a1.sources.r1.channels = c1
a1.sources.r1.initialContextFactory = some context urls
a1.sources.r1.connectionFactory = some_queue
a1.sources.r1.providerURL = some_url 
#a1.sources.r1.providerURL = some_url
a1.sources.r1.destinationType = QUEUE
a1.sources.r1.destinationName = some_queue_name 
a1.sources.r1.userName = some_user
a1.sources.r1.passwordFile= passwd

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = some_kafka_topic
a1.sinks.k1.kafka.bootstrap.servers = some_URL
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.flumeBatchSize = 1
a1.sinks.k1.channel = c1

任何帮助都将不胜感激!!

共有2个答案

澹台建华
2023-03-14

看来我已经解决了我的问题;由于怀疑增加了max.request。size消除了异常,对于更新此类Kafka水槽(生产商)属性,FLUME提供了常量前缀,如Kafka。制作人。我们可以用任何Kafka属性附加这个常量前缀;

所以我的是,a1。下沉。k1。Kafka。制作人最大请求。尺寸=5271988

彭允晨
2023-03-14

这种改变必须在Kafka进行。更新Kafka生产者配置文件producer。属性,具有较大的值,如

max.request.size=10000000
 类似资料:
  • 我有两个代理1.0.0Kafka集群,我正在针对这个Kafka运行1.0.0Kafka流API应用程序。我增加了制片人的要求。暂停。毫秒到5分钟来修复生产者超时异常。 目前,在运行一段时间后,我发现以下两种类型的异常。我试图按照ApacheKafka中的建议修复这些异常:TimeoutException,然后什么都不起作用‏ 但不完整的解决方案就在这里。建议使用此解决方案(减少生产批量)。请帮忙。

  • 我测试了Apache Flume将文件从本地传输到HDFS。但是,如果源文件来自多个服务器(将文件从不同服务器的本地传输到HDFS),我可以只运行一个Flume实例并将更多代理添加到Flume-conf.property中吗? 如果可以,如何在 flume-conf.properties 中编辑以下参数: 还有,我怎样才能运行水槽? 只能运行一个水槽。两个以上呢?

  • 我正在使用 flume 1.8.0,它会自动关闭,任何人都可以帮助我。 在 JAVA 8 上运行 JAVA_OPTS=“-服务器 -Xms4g -Xmx4g” 配置在水槽中 使用以下命令启动 flume ./bin/flume-ng agent --conf conf --conf-file ./conf/flume-conf-postgresql.properties --name dfm-to

  • 我有一个Flink 1.11作业,它使用来自Kafka主题的消息,键入它们,过滤它们(keyBy后跟自定义ProcessFunction),并通过JDBC接收器将它们保存到db中(如下所述:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/jdbc.html) Kafka消费者使用以下选项初始化:

  • 我试图配置水槽与HDFS作为汇。 这是我的flume.conf文件: 我的hadoop版本是: 水槽版本是: 我已将这两个jar文件放在flume/lib目录中 我将hadoop common jar放在那里,因为在启动flume代理时出现以下错误: 现在代理开始了。这是启动日志: 但是当一些事件发生时,下面的错误出现在水槽日志中,并且没有任何东西被写入hdfs。 我缺少一些配置或jar文件?

  • 我正在尝试使用hdfs水槽运行水槽。hdfs在不同的机器上正常运行,我甚至可以与水槽机器上的hdfs交互,但是当我运行水槽并向其发送事件时,我收到以下错误: 同样,一致性不是问题,因为我可以使用hadoop命令行与hdfs交互(水槽机不是datanode)。最奇怪的是,在杀死水槽后,我可以看到tmp文件是在hdfs中创建的,但它是空的(扩展名仍然是. tmp)。 关于为什么会发生这种情况的任何想法