当前位置: 首页 > 知识库问答 >
问题:

批量发布Kafka对象

慕容铭
2023-03-14

我用的是Kafka普勒。send()方法将记录发布到Kafka。这是一种异步方法。我的申请将向Kafka发布大约2万张唱片。在成功发送大约10k条记录后,我多次收到超时异常。我增加了一批。大小和逗留。ms减少了问题,但有时我仍然会遇到超时异常。有没有办法一次只发送10k条记录,等待它们完成,然后再发送下一批记录??我不想通过使用send使其同步。get()因为这会让它变得非常慢。这是我得到的例外。

Error publishing object on partition 0 org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s)

共有1个答案

东门涵育
2023-03-14

您是否尝试将retries=3设置为生产者配置
如果记录过期,您可能不应该增加批处理。大小但要减小。

 类似资料:
  • 问题内容: 我已经开始尝试使用Django REST框架。到目前为止,我已经成功地为我的对象创建了一个序列化程序,通过Javascript的$ .post()创建了发布视图,发布对象和返回对象。因此,现在我可以在JSON和Django模型对象之间进行适当的转换。 问题是我有一个对象数组[A1,A2,…,An]。现在,当我需要发布这样的数组时,我逐个对象地进行处理。是否有可能一次发布整个数组,并在D

  • 高级使用者 API 似乎一次读取一条消息。 如果消费者想要处理这些消息并提交给其他下游消费者(如Solr或Elastic-Search ),这可能会给他们带来很大的问题,因为他们更喜欢批量接收消息,而不是一次接收一条。 在内存中批处理这些消息也并非易事,因为只有当批处理已经提交时,Kafka中的偏移量也需要同步,否则具有未提交下游消息的崩溃的 kafka 使用者(如在Solr或ES中)将已经更新其

  • 我是spark streaming的新手,我有一个关于其用法的一般性问题。我目前正在实现一个应用程序,它从一个Kafka主题流式传输数据。 使用应用程序只运行一次批处理是一种常见的场景吗,例如,一天结束,收集主题中的所有数据,做一些聚合和转换等等? 这意味着在用spark-submit启动应用程序后,所有这些东西将在一批中执行,然后应用程序将被关闭。或者spark stream build是为了在

  • 我有一个关于优化kafka异步生产者吞吐量的问题:配置批处理。大小和逗留。ms在使用异步生成器时会产生影响吗? 我这样问是因为我认为这些参数只会影响同步生产者,因为它将等待代理确认。在异步生产者的情况下,这不会产生影响吗? 此外,是否有任何配置参数可以优化异步生成器?

  • 请求参数说明 参数 描述 必填 示例值 类型 最大长度 action 接口参数组 是 object └action 需要调用的接口名称 是 mb_newcard string get GET参数组,本组参数需要参与签名 是 object └type 发卡模式(不填默认为2,2为发通卡模式 15为储值卡模式) 否 15 number └balance 储值余额 否 100 number └count

  • 一种写法同时支持 Curl 和 Swoole use \Yurun\Util\YurunHttp\Co\Batch; use \Yurun\Util\HttpRequest; $result = Batch::run([ (new HttpRequest)->url('https://www.imiphp.com'), (new HttpRequest)->url('https: