假设我有两个经纪人。
我读到Kafka制作人创建的制作人线程等于经纪人的数量。在这种情况下,我将有两个内部线程。
假设我有5个主题,每秒只收到200条消息。Kafka如何进行批处理?
一批大小=30条消息。[topic1=5,topic2=10,topic3=3,topic4=10,topic5=2消息]这些是最重要的消息和相应的主题。
Kafka是如何执行批处理的?
记录累加器负责批量处理发送给Kafka经纪人的值。
- maintains a ConcurrentMap<TopicPartition, Deque<RecordBatch>>
- gets Deque of relevant TopicPartition
- gets last RecordBatch present in Deque, if RecordBatch(bytebuffer bounded by batch.size) is not full appends value to the RecordBatch
- if last RecordBatch is null, no RecordBatch exists for the relevant topic partition hence allocates a new byte buffer
- does a double check locking on last RecordBatch again, incase some other thread might have created the RecordBatch
- if RecordBatch exists, tries appending the value
- if still RecordBatch is null, creates MemoryRecords (backed by byte buffer)
- adds MemoryRecords to RecordBatch
- appends value to RecordBatch ( inside MemoryRecords eventually Byte Buffer )
- adds RecordBatch to Deque
类层次结构:
RecordBatch
- MemoryRecords
- ByteBuffer
我读到Kafka制作人创建的制作人线程等于经纪人的数量。在这种情况下,我将有两个内部线程。
不确定,你是从哪里得到这些信息的,但这是不正确的。一个KafkaProducer
确实有一个后台线程来异步地向代理写入数据。
配料是如何发生的,很难详细预测。这取决于你的批次。大小(这是一个最大值)。此外,还有linger。ms
参数,用于定义在发送数据之前保留数据的时间(即使批次未满)。
更详细地说,将有到托管您写入的分区的所有代理的开放网络连接。此外,批处理是基于分区的——但是,多个批处理可以包含在对代理的单个请求中。
一、生产者发送消息的过程 首先介绍一下 Kafka 生产者发送消息的过程: Kafka 会将发送消息包装为 ProducerRecord 对象, ProducerRecord 对象包含了目标主题和要发送的内容,同时还可以指定键和分区。在发送 ProducerRecord 对象前,生产者会先把键和值对象序列化成字节数组,这样它们才能够在网络上传输。 接下来,数据被传给分区器。如果之前已经在 Prod
Kafka文件说,幂等生产者是可能的,与相同的生产者会话,我无法理解这一点。 比方说,Kafka为每条消息添加序列号,最后一个序列号在Kafka中维护(不确定它维护在哪里)。 它是如何生成序列号的,它保存在哪里? 为什么当制作人崩溃并再次出现时,它不能保持序列? 我怎样才能使它在制作人会话之间真正幂等?
Kafka为每条消息生成偏移量。假设,我正在生成消息5,偏移量将从1到5。 但是,在事务生产者中,比如说,我产生了5条消息并提交,然后是5条消息但中止,然后是5条消息提交。 > 那么,最后提交的5条消息的偏移量是6到10还是11到15? 如果我不放弃或不promise呢。这些信息还会被发布吗? Kafka是如何忽略未promise的补偿的?因此,kafka提交日志是基于偏移量的。它是否使用事务使用
我正在使用一个非事务性生产者,并试图理解如何处理成功/失败场景的回调。 对于一个成功的发送,我看到回调由kafka-producer-network-thread线程执行(“send ok”消息)。 发送消息成功-kafka-producer-network-thread 00:59:17.522
这是我的密码。
我有一个生产者/消费者模式,如下所示 固定数量的生成器线程,每个线程写入它们自己的BlockingQueue,通过执行器调用 单个使用者线程,读取生产者线程 每个生产者都在运行一个数据库查询,并将结果写入其队列。消费者轮询所有生产者队列。目前,如果出现数据库错误,生产者线程就会死掉,然后消费者就会永远停留在产品队列中等待更多的结果。 我应该如何构造它来正确处理catch错误?