“发送是异步的,一旦记录存储在等待发送的记录缓冲区中,此方法将立即返回。这允许并行发送许多记录,而不会阻塞等待每个记录之后的响应。“
我只是想知道这些记录是如何并行发送的?如果我有3个代理,并且在同一主题下的每个代理上有3个分区,Kafka生产者会将记录并行地发送到9个分区吗?或者制作人只是并行地向3个经纪人发送唱片?生产者如何以平行的方式工作?
Kafka客户机使用org.apache.Kafka.common.requests.produceRequest
,它可以同时为多个分区承载有效负载(请参见http://Kafka.apache.org/protocol.html#the_messages_produce)。
因此,它将(使用org.apache.kafka.clients.networkclient
)三个请求并行发送到(三个)代理中的每一个,即:
- sends records for topic-partition0, topic-partition1, topic-partition2 to broker 1
- sends records for topic-partition3, topic-partition4, topic-partition5 to broker 2
- sends records for topic-partition6, topic-partition7, topic-partition8 to broker 3
您可以控制使用生产者配置完成多少批处理。
前期回顾 其中channel.finishConnect()中完成建立连接,调用了 sender的run(),继续分析 其中步骤五和步骤七: 会把发往同个broker上面partition的数据组合成为一个请求,然后统一一次发送过去,这样子就减少了网络请求。调用send() 调用selector的send() 调用kafkachannel的setsend() 开始发送数据 sender里面的pol
我使用的是Kafka producer客户端,我的项目中没有任何log4j配置。 在运行时,程序打印了大量的Kafka调试日志,这是我不想要的。
如果我只在生产者端发送一条记录并等待,生产者何时将记录发送给经纪人?在Kafka文档中,我找到了名为“linger.ms”的配置,它说: 一旦我们得到 根据以上文件,我有两个问题。 > 如果生产者收到的数据达到batch.size,它会立即触发发送一个只包含一个批次的请求给代理?但是正如我们所知,一个请求可以包含许多批次,那么它是如何发生的呢? 这是否意味着即使是收到的数据也不足以批量处理。大小,
我在Java日志记录方面遇到了一些障碍。我想将日志发送到笔记本电脑上的Nginx本地服务器。我如何使用log4j或slf4j实现这个函数,有人能帮我实现代码吗?谢谢
我试图用Java8中的流创建一个生产者多个消费者模型。我正在从数据库资源中读取和处理数据,我想以流式方式处理它们(不能将整个资源读取到内存中)。
我是新的Flink流处理,并需要一些帮助与FlinkKafka生产者,因为不能找到很多相关的搜索后一段时间。我目前正在阅读一个Kafka主题的流,然后在执行一些计算后,我想把这个写到新的Kafka中的一个分离主题。但我面临的问题是,我无法发送Kafka主题的关键。我使用的是Flink Kafka连接器,它给了我FlinkKafkaConsumer和flinkkafkaProducer。更详细的查看