当前位置: 首页 > 知识库问答 >
问题:

如何有效地从集合中生成消息给Kafka

连文栋
2023-03-14

在我的Scala(2.11)流应用程序中,我正在使用IBM MQ中的一个队列中的数据,并将其写入一个具有一个分区的Kafka主题。在使用MQ的数据之后,消息负载被拆分为3000个较小的消息,这些消息存储在字符串序列中。然后使用KafkaProducer将这3000条消息中的每一条发送到Kafka(2.x)。

你怎么发那3000条信息?

我不能增加IBM MQ中的队列数(不在我的控制之下),也不能增加主题中的分区数(需要对消息进行排序,编写自定义分区器会影响主题的过多使用者)。

制作人设置当前为:

  • ACKS=1
  • Linger.ms=0
  • 批处理.size=65536

但优化它们可能是它自己的问题,而不是我当前问题的一部分。

目前,我正在做

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

private lazy val kafkaProducer: KafkaProducer[String, String] = new KafkaProducer[String, String](someProperties)
val messages: Seq[String] = Seq(String1, …, String3000)
for (msg <- messages) {
    val future = kafkaProducer.send(new ProducerRecord[String, String](someTopic, someKey, msg))
    val recordMetadata = future.get()
}

在我看来,这不是最优雅和最有效的方式。是否有一种编程方式来提高吞吐量?

多亏这个答案给我指明了正确的方向,我对不同的生产者方法进行了更仔细的研究。《Kafka--权威指南》一书列出了这些方法:

Fire-and-forget我们向服务器发送一条消息,并不关心它是否成功到达。大多数时候,它会成功到达,因为Kafka是高度可用的,并且制作人会重新尝试自动发送消息。但是,使用此方法会丢失一些消息。

同步发送我们发送一个消息,send()方法返回一个Future对象,我们使用get()等待Future并查看send()是否成功。

异步发送我们用一个回调函数调用send()方法,当它收到来自Kafka代理的响应时,它会被触发

现在我的代码看起来是这样的(省略了错误处理和回调类的定义):

  val asyncProducer = new KafkaProducer[String, String](someProperties)

  for (msg <- messages) {
    val record = new ProducerRecord[String, String](someTopic, someKey, msg)
    asyncProducer.send(record, new compareProducerCallback)
  }
  asyncProducer.flush()

我已经比较了10000个非常小的消息的所有方法。下面是我的测量结果:

>

  • 触发即忘:173683464ns

    同步发送:29195039875NS

    异步发送:44153826ns

    老实说,通过选择正确的属性(batch.size、linger.ms、...),可能有更大的潜力来优化它们。

  • 共有1个答案

    姬歌者
    2023-03-14

    我能看出您的代码慢的最大原因是您在等待每一个发送的未来。

    卡夫卡被设计用来发送成批。通过一次发送一个记录,您等待每个记录的往返时间,您没有从压缩中得到任何好处。

    要做的“惯用的”事情是发送所有内容,然后在第二个循环中阻止所有产生的未来。

    此外,如果您打算这样做,我会将linger备份(否则,您的第一条记录将导致一个大小为1的批处理,从而降低您的总体速度。请参阅https://en.wikipedia.org/wiki/nagle%27s_algorithm)并在发送循环完成后调用生产者的flush()。

     类似资料:
    • 请我需要流构建器小部件返回这最后一条消息(红色矩形框)在我的消息集合。我在下面使用的返回空字符串的方法必须在Text小部件中提供。 当我试图从消息集合下的最后一个消息字段获取消息时,它返回的错误(即必须在Text小部件中提供空字符串)发生在Text(message.message)处。

    • 问题内容: 我有一个n维数组,如下所示: 在此数组中,元素表示低值和高值。例如:指 我需要使用上面给出的范围生成所有值的组合。例如我想要 我已经尝试了以下方法来获得想要的东西: 笛卡尔函数取自使用numpy来构建两个数组的所有组合的数组 我需要做 几百万遍 。 我的问题:是否有 更好/有效的 方法来做到这一点? 问题答案: 我认为您正在寻找的是。不幸的是,这会以与所需格式不同的格式返回数组,因此您

    • 我有以下问题: 我在aws上只使用1个worker和broker节点建立了一个尽可能基本的汇合平台。 我对所有与人脉和关系网有关的事情都很敏感,所以我希望有人能帮助我

    • 所以我试图生成一个数组,其中填充了唯一的随机整数,我发现用arraylist来实现这一点是最有效的方法。 现在我试着使用但我不太确定括号里应该放什么,也不确定这是否真的能起作用。有没有其他转换方法,因为我不能简单地通过。

    • 问题内容: 我正在尝试使用 生成器 在Python中构建给定集合的子集列表。说我有 作为输入,我应该有 作为输出。我该如何实现? 问题答案: 最快的方法是使用itertools,尤其是链和组合: 如果需要生成器,只需使用yield并将元组变成集合: 然后简单地:

    • 我正在开发一个使用Spring Integration 5.0.1和Spring Boot 2.0.0的应用程序。RC1 目前,应用程序响应并运行一些可能需要一段时间才能完成的初始化代码。这不使用任何Spring集成组件。 我还有一些非常基本的集成流,使用JavaDSL编写,并在配置中声明为bean。 有什么方法可以推迟流何时开始消耗消息吗?我希望能够在初始化完成时手动启动它们。 配置似乎是解决方