依赖
<!-- in your <properties> block -->
<pulsar.version>2.8.1</pulsar.version>
<!-- in your <dependencies> block -->
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>${pulsar.version}</version>
</dependency>
client初始化
pulsarClient = PulsarClient.builder()
.serviceUrl(url)
.ioThreads(1)
.authentication(AuthenticationFactory.token(token))
.listenerThreads(8)
.operationTimeout(30, TimeUnit.SECONDS)
.connectionTimeout(15, TimeUnit.SECONDS)
.build();
- ioThreads
无特殊需求无需配置,默认值为1。netty 的 ioThreads 负责网络 IO 操作,如果业务流量较大,可以调高 ioThreads 个数; - listenersThreads
默认值为1,建议值8(默认partition数量为4,根据client中新建consumer数量决定)。该client下所有消费者共享,负责调用以 listener 模式启动的消费者的回调函数,建议配置大于该 client 负责的所有 partition 数目,否则会存在线程竞争导致消息吞吐下降; - perationTimeout
无特殊需求无需配置,默认值为30s。元数据操作的超时时间。 - connectionTimeout
无特殊需求无需配置,默认值为10s。连接 Pulsar 的超时时间。
producer初始化
Producer<byte[]> producer = client.newProducer()
.topic(topicName)
.maxPendingMessages(500)
.blockIfQueueFull(true)
.enableBatching(true)
.compressionType(CompressionType.LZ4)
.producerName(producerName)
.create();
- maxPendingMessages
默认值1000,建议值500。生产者消息发送队列,根据实际 topic 的量级合理配置,避免在网络中断、Pulsar 故障场景下的 OOM。 - blockIfQueueFull
默认false。在maxPendingMessages达到之后,producer的Send()以及SendAsync()是阻塞住还是抛出异常,true为阻塞,false为抛出异常。 - messageRoutingMode
消息路由模式。无顺序消息要求无需配置。默认为 RoundRobinPartition。根据业务需求选择,如果需要保序,一般选择 SinglePartition,把相同 key 的消息发到同一个 partition。 - enableBatching
是否开启批发送,默认为true。如果该 topic 上消息数较小,则不建议开启 batch。需要注意的是enableBatching只在异步发送sendAsync生效,同步发送send失效。因此建议生产环境若想使用批处理,则需使用异步发送,或者多线程同步发送。 - compressionType
数据压缩类型,默认不压缩。可选择LZ4,ZLIB,ZSTD,SNAPPY。consumer端不用做改动就能消费,如果需要开启数据压缩,建议设置为LZ4,开启后大约可以降低3/4带宽消耗和存储(官方测试)。
consumer初始化
// 如果消费业务中存在耗时操作,建议放到用户线程池中执行
MessageListener myMessageListener = (consumer, msg) -> {
try {
System.out.println("Message received: " + new String(msg.getData()));
consumer.acknowledge(msg);
} catch (Exception e) {
consumer.negativeAcknowledge(msg);
}
};
Consumer<byte[]> consumerCreated = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Shared)
.subscriptionInitialPosition(SubscriptionInitialPosition.Latest)
.receiverQueueSize(100)
.negativeAckRedeliveryDelay(60, TimeUnit.SECONDS)
.messageListener(myMessageListener)
.subscribe();
- receiverQueueSize
consumer的接收队列长度,默认值1000,建议值100。消费端处理不过来时,消费缓冲队列会积压在内存中,增加该参数可以提高consumer的吞吐能力,但是相应的会消耗更多的内存,并且会存在消息消费不均衡的情况。需要合理配置防止OOM。 - subscribeType
订阅类型,根据业务需求决定,默认为Exclusive。无特殊需求,建议设置为Shared模式。 - subscriptionInitialPosition
默认配置为Latest,测试、灰度以及生产环境sub均需提前创建,默认从Latest开始订阅。 - messageListener
- 使用 listener 模式消费,只需要提供回调函数,不需要主动执行 receive() 拉取。一般没无特殊诉求,建议采用 listener 模式。线程数为client初始化中的listenersThreads。
- ackTimeout
默认值为0。需要特别注意的是,使用默认值的时候会一直等待当前 consumer 对该条消息的 ack 或者 nack;如果一直没有 ack 或者 nack,只有在当前 consumer 下线之后才会推送给其他的 consumer,这边需要特别注意,因为如果当前 consumer 的 receiverQueue 满了则会阻塞当前 consumer 的消费。配置 ackTimeout > 0 后,当服务端推送消息但消费者未及时回复 ack 时,经过 ackTimeout 后,会重新推送给消费者处理,即 redeliver 机制。注意在利用 redeliver 机制的时候,一定要注意仅仅使用重试机制来重试可恢复的错误。举个例子,如果代码里面对消息进行解码,解码失败就不适合利用 redeliver 机制。这会导致客户端一直处于重试之中。如果拿捏不准,还可以通过下面的 deadLetterPolicy 配置死信队列,防止消息一直重试。 - negativeAckRedeliveryDelay
默认60s。当客户端调用 negativeAcknowledge 时,触发 redeliver 机制的时间,默认为立刻redeliver。 redeliver 机制的注意点同 ackTimeout。需要注意的是, ackTimeout 和 negativeAckRedeliveryDelay 建议不要同时使用,一般建议使用 negativeAck,用户可以有更灵活的控制权。一旦 ackTimeout 配置的不合理,在消费时间不确定的情况下可能会导致消息不必要的重试。 - deadLetterPolicy
配置 redeliver 的最大次数和死信 topic。