当前位置: 首页 > 知识库问答 >
问题:

我如何用Kafka发送大的消息(超过15MB)?

酆耀
2023-03-14

(异常出现在生产者中,我在这个应用程序中没有消费者。)

我该怎么做才能摆脱这个例外呢?

private ProducerConfig kafkaConfig() {
    Properties props = new Properties();
    props.put("metadata.broker.list", BROKERS);
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("request.required.acks", "1");
    props.put("message.max.bytes", "" + 1024 * 1024 * 40);
    return new ProducerConfig(props);
}
4709 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with    correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with   correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to send requests for topics datasift with correlation ids in [213,224]

kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)

共有1个答案

戚同
2023-03-14

您需要调整三个(或四个)属性:

  • 使用者端:fetch.message.max.bytes-这将确定使用者可以获取的消息的最大大小。
  • 代理端:replica.fetch.max.bytes-这将允许代理中的副本在集群内发送消息,并确保正确复制消息。如果这太小,那么消息将永远不会被复制,因此,使用者将永远不会看到消息,因为消息将永远不会提交(完全复制)。
  • 代理端:message.max.bytes-这是代理可以从生产者接收的最大大小的消息。
  • 代理端(每个主题):max.message.bytes-这是代理允许附加到主题的最大消息大小。此大小是在压缩前验证的。(默认值为Broker的message.max.bytes。)

我发现了第二个问题--你不会从Kafka那里得到任何异常、消息或警告,所以当你发送大的消息时,一定要考虑到这一点。

 类似资料:
  • 如何保持一个连续的流,以“反应”新的丢弃的文件?(或其他事件,如HTTP GET请求或类似的事件)... 例如,如果我不返回PublisherBuilder的实例,而是返回一个整数,那么我的kafka主题将由一个非常巨大的整数值流填充。这就是为什么示例在发送消息时使用一些间隔... 我应该使用一些CompletationStage或CompletableFuture吗?RXJava2?使用哪个li

  • 我们正在使用spring kafka 1.2.2。释放 我们想要的 1.一旦消息被消费并成功处理,就会在spring-kafka中提交偏移量。 我正在使用Manaul Commit/Ac认收它,它工作正常。 2.在任何异常的情况下,我们希望spring-kafka重新发送相同的消息。 我们对任何系统误差抛出RunTime异常,它由spring-kafka记录并且从未提交。 这很好,因为我们不希望它

  • 如何在Kafka中发送同步消息 实现这一点的一种方法是设置properties参数 。 但是我想知道是否有一种甚至直接或替代的方式在Kafka中发送同步消息。(比如producer.sync发送(...)等等)。

  • 我有一个应用程序,它定期生成原始JSON消息数组。我能够使用avro-tools将其转换为Avro。我这样做是因为由于Kafka-Connect JDBC接收器的限制,我需要消息包含模式。我可以在记事本上打开这个文件,看到它包括模式和几行数据。 现在,我想将其发送到我的中央Kafka代理,然后使用Kafka Connect JDBC接收器将数据放入数据库。我很难理解我应该如何将这些Avro文件发送

  • 如何使用新的Spring Cloud Stream Kafka功能模型发送消息? 不推荐的方式是这样的。 但是我如何以函数式风格发送消息呢? 应用yml公司 我会自动连接MessageChannel,但对于process、process-out-0、output或类似的东西,没有MessageChannel Bean。或者我可以用供应商Bean发送消息吗?谁能给我举个例子吗?谢谢!

  • 我想一次通过udp客户端发送许多udp消息,但是演示只发送一个message.how我能实现吗? 使用演示代码,我只能发送有限数量的消息。我想用while(true)来发送消息,我该如何实现呢? public static void main(String[]args){Connection Connection=UdpClient.create().host(“localhost”).port(