spring.kafka.listener.concurrency
@KafkaListener.concurrency
仅在多partition对应单个消费端
时,用于多线程消费消息
(concurrency <= partition数量),
当存在多个消费端
时,优先考虑让新的消费端去消费(而不是多线程,即便设置concurrency > 1也仅有唯一消费线程生效),
所以如果消费端个数本身就大于等于分区数了,那么concurrency这个参数再设置大于1就是浪费(可保持默认1
)
spring.kafka.consumer.enable-auto-commit
spring.kafka.conusmer.auto-commit-interval
spring.kafka.listener.ack-mode
spring.kafka.listener.ack-time
spring.kafka.listener.ack-count
ack-mode列表(详细说明参见:SpringKafka - Committing Offsets)
ack-mode模式 | 说明 | 自动提交 |
---|---|---|
RECORD 单记录 | 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交 | ✔️ |
BATCH(默认) 批量 | 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交 | ✔️ |
TIME 超时 | 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交 (通过spring.kafka.listener.ack-time设置触发时间) | ✔️ |
COUNT 超过消费数量 | 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交 (通过spring.kafka.listener.ack-count设置触发数量) | ✔️ |
COUNT_TIME 超时或超数量 | TIME或COUNT 有一个条件满足时提交 | ✔️ |
MANUAL 手动提交(ack)后同BATCH | 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交 | ❌ 需要手动使用 Acknowledgment参数提交 |
MANUAL_IMMEDIATE 手动立即提交 | 手动调用Acknowledgment.acknowledge()后立即提交 | ❌ 需要手动使用 Acknowledgment参数提交 |
spring.kafka.producer.batch-size
spring.kafka.producer.properties.linger.ms
spring.kafka.producer.buffer-memory
batch.size
和 linger.ms
就是哪个条件先满足就都会将消息发送出去 (即高吞吐量
与延时
的平衡)KafkaProducer
发送出去的消息都是先进入到客户端本地的内存缓冲
里,
然后把很多消息收集成一个一个的Batch
,再发送到Broker
上去的,
这样性能才可能高。
注:
(1)如果buffer.memory
设置的太小,而消息写入过快导致buffer被写满,则会造成发消息线程阻塞(直到max.block.ms超时抛出异常)。
(2)如果要发送大文件的话,要同时提高buffer.memory
和batch.size
的大小。