当前位置: 首页 > 工具软件 > Spring Kafka > 使用案例 >

Spring Kafka相关配置的注意事项【持续更新】

陆海阳
2023-12-01

消费者并发数量

spring.kafka.listener.concurrency
@KafkaListener.concurrency

仅在多partition对应单个消费端时,用于多线程消费消息(concurrency <= partition数量),
当存在多个消费端时,优先考虑让新的消费端去消费(而不是多线程,即便设置concurrency > 1也仅有唯一消费线程生效),
所以如果消费端个数本身就大于等于分区数了,那么concurrency这个参数再设置大于1就是浪费(可保持默认1)

消费者提交已消费消息offset

spring.kafka.consumer.enable-auto-commit
spring.kafka.conusmer.auto-commit-interval
spring.kafka.listener.ack-mode
spring.kafka.listener.ack-time
spring.kafka.listener.ack-count

  • spring.kafka.consumer.enbable-auto-commit
    • true 自动提交已消费消息offset
      • auto-commit-interval 设置自动提交间隔
    • fasle 由程序控制已消费消息offset提交
      • spring.kafka.listener.ack-mode 已消费offset提交模式

ack-mode列表(详细说明参见:SpringKafka - Committing Offsets

ack-mode模式说明自动提交
RECORD
单记录
当每一条记录被消费者监听器(ListenerConsumer)处理之后提交✔️
BATCH(默认)
批量
当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交✔️
TIME
超时
当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
(通过spring.kafka.listener.ack-time设置触发时间)
✔️
COUNT
超过消费数量
当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
(通过spring.kafka.listener.ack-count设置触发数量)
✔️
COUNT_TIME
超时或超数量
TIME或COUNT 有一个条件满足时提交✔️
MANUAL
手动提交(ack)后同BATCH
当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
需要手动使用
Acknowledgment参数提交
MANUAL_IMMEDIATE
手动立即提交
手动调用Acknowledgment.acknowledge()后立即提交
需要手动使用
Acknowledgment参数提交

生产者批量发送消息

spring.kafka.producer.batch-size
spring.kafka.producer.properties.linger.ms
spring.kafka.producer.buffer-memory

  • spring.kafka.producer.batch-size
    批量发送消息大小(被发送到同一partition),默认16KB
  • spring.kafka.producer.properties.linger.ms
    延时发送毫秒数(可结合batch.size以保证即使未到达batchSize也会在指定时间内发送消息)
    注: 同时设置batch.sizelinger.ms就是哪个条件先满足就都会将消息发送出去 (即高吞吐量延时的平衡)
  • spring.kafka.producer.buffer-memory
    发送消息内存缓存区大小(缓存未发送的消息)

KafkaProducer发送出去的消息都是先进入到客户端本地的内存缓冲里,
然后把很多消息收集成一个一个的Batch,再发送到Broker上去的,
这样性能才可能高。

buffer.memory
batch.size
batch.size
batch.size
KafkaProduer
BufferMemory
Batch1
Batch2
BatchN

注:
(1)如果buffer.memory设置的太小,而消息写入过快导致buffer被写满,则会造成发消息线程阻塞(直到max.block.ms超时抛出异常)。
(2)如果要发送大文件的话,要同时提高buffer.memorybatch.size的大小。

 类似资料: