当前位置: 首页 > 知识库问答 >
问题:

使用Scala的Kafka Producer无法进行批处理

归德厚
2023-03-14

我正在用Scala编写一个制作人,我想做批处理。批处理的工作方式是,它应该将消息保持在队列中,直到消息已满,然后将所有消息一起发布到主题上。但不知怎么的,它不起作用了。从我开始发送消息的那一刻起,它就开始一个接一个地发布消息。有人知道如何在Kafka Producer中使用批处理吗。

val kafkaStringSerializer = "org.apache.kafka.common.serialization.StringSerializer"
      val batchSize: java.lang.Integer = 163840
      val props = new Properties()
      props.put("key.serializer", kafkaStringSerializer)
      props.put("value.serializer", kafkaStringSerializer)
      props.put("batch.size", batchSize);
      props.put("bootstrap.servers", "localhost:9092")

      val producer = new KafkaProducer[String,String](props)

      val TOPIC="topic"
      val inlineMessage = "adsdasdddddssssssssssss"

      for(i<- 1 to 10){
        val record: ProducerRecord[String, String] = new ProducerRecord(TOPIC, inlineMessage )
        val futureResponse: Future[RecordMetadata] =  producer.send(record)
        futureResponse.isDone
        println("Future Response ==========>" + futureResponse.get().serializedValueSize())
      }

共有2个答案

裴楚青
2023-03-14

您正在同步向Kafka服务器生成数据。意思是,当你呼叫制作人时。发送带有未来响应的。get,只有在数据存储在Kafka服务器中后才会返回。

将响应存储在单独的列表中,并在循环的之外调用FutureResponse.get

在默认配置下,Kafka支持批处理,请参见linger。ms和批处理。尺寸

List<Future<RecordMetadata>> responses = new ArrayList<>();
for (int i=1; i<=10; i++) {
    ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, inlineMessage);
    Future<RecordMetadata> response = producer.send(record);
    responses.add(response);
}

for (Future<RecordMetadata> response : responses) {
    response.get(); // verify whether the message is sent to the broker.
}

齐俊贤
2023-03-14

你必须在道具中设置linger.ms

默认情况下,它为零,这意味着如果可能的话,消息会立即发送。您可以增加它(例如100),以便批量发生——这意味着更高的延迟,但更高的吞吐量。

批处理。大小是最大值:如果在逗留之前达到该值。ms已通过,数据将在不等待更多时间的情况下发送。

要查看实际发送的批,您需要配置日志记录(批处理在后台线程上完成,您将无法查看使用producer api完成的批处理-您无法发送或接收批处理,只能发送记录并接收其响应,通过批处理与代理的通信在内部完成)

首先,如果尚未完成,请绑定一个log4j属性文件(Dlog4j.configuration=file:path/to/log4j.properties

log4j.rootLogger=WARN, stderr
log4j.logger.org.apache.kafka.clients.producer.internals.Sender=TRACE, stderr

log4j.appender.stderr=org.apache.log4j.ConsoleAppender
log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
log4j.appender.stderr.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.stderr.Target=System.err

例如,我会收到

TRACE Sent produce request to 2: (type=ProduceRequest, magic=1, acks=1, timeout=30000, partitionRecords=({test-1=[(record=LegacyRecordBatch(offset=0, Record(magic=1, attributes=0, compression=NONE, crc=2237306008, CreateTime=1502444105996, key=0 bytes, value=2 bytes))), (record=LegacyRecordBatch(offset=1, Record(magic=1, attributes=0, compression=NONE, crc=3259548815, CreateTime=1502444106029, key=0 bytes, value=2 bytes)))]}), transactionalId='' (org.apache.kafka.clients.producer.internals.Sender)

这是一批2个数据。批处理将包含发送到同一代理的记录

然后,玩batch.size和linger.ms看看区别。请注意,记录包含一些开销,因此1000的batch.size不会包含10条大小为100的消息

注意,我没有找到说明所有记录器及其功能的文档(比如log4j.logger.org.apache.kafka.clients.producer.internals.Sender)。您可以在rootLogger上启用DEBUG/TRACE并找到所需的数据,或者浏览代码

 类似资料:
  • 问题内容: 从文档 如果遇到需要插入1000 000行/对象的情况: 为什么我们应该使用这种方法?与StatelessSession一相比,它给我们带来了什么好处: 我的意思是,这个(“替代”)最后一个示例不使用内存,不需要进行同步,清除缓存,那么对于这样的情况,这应该是最佳实践吗?那么为什么要使用前一个呢? 问题答案: 从文档中,您链接到: 特别是,无状态会话不会实现第一级缓存,也不会与任何第二

  • 问题内容: 在我的应用程序中,我需要执行很多插入操作。它是一个Java应用程序,我正在使用普通的JDBC执行查询。该数据库是Oracle。我已经启用了批处理,因此它节省了我执行查询所需的网络等待时间。但是查询作为独立的INSERT顺序执行: 我想知道以下形式的INSERT是否可能更有效: 即将多个INSERT折叠成一个。 还有其他使批处理INSERT更快的技巧吗? 问题答案: 这是前两个答案的混合

  • 您好,我是Spring batch的新手,我遇到了以下无法解决的异常: 此处为我的代码: 我可以通过添加

  • 我正试图弄清楚如何使用Spring Batch进行聚合。例如,我有一个带有姓名列表的CSV文件: 我想要文本文件中的姓名计数: 根据我从Spring Batch中学到的,ETL批处理过程(itemReader- Spring Batch是正确的工具吗?还是我应该用Spark?谢谢

  • 我试图在Spark中创建成批的行。为了保持发送到服务的记录数量,我想对项目进行批处理,这样我就可以保持数据发送的速率。对于, 对于给定的我想创建 例如,如果输入有100条记录,那么输出应该像一样,其中每个应该是记录(Person)的列表。 我试过了,但没用。 我想在Hadoop集群上运行此作业。有人能帮我吗?

  • 问题内容: 我想知道如何使用MyBatis 3和Spring 3通过我的插入语句实现批处理操作? 例如,这是当前正在执行的操作: spring.xml: MyService.xml: MyService.java: MyController.java: 免责声明:这只是用于演示目的的伪代码 那么我该怎么做才能将其变成批处理流程呢? 理想情况下,我希望能够以最少的“侵入”代码来做到这一点,即更优先使