pulsar最佳实践

龙永逸
2023-12-01

依赖

<!-- in your <properties> block -->
<pulsar.version>2.8.1</pulsar.version>

<!-- in your <dependencies> block -->
<dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-client</artifactId>
    <version>${pulsar.version}</version>
</dependency>

client初始化

pulsarClient = PulsarClient.builder()
                   .serviceUrl(url)
                   .ioThreads(1)
                   .authentication(AuthenticationFactory.token(token))
                   .listenerThreads(8)
                   .operationTimeout(30, TimeUnit.SECONDS)
                   .connectionTimeout(15, TimeUnit.SECONDS)
                   .build();
  1. ioThreads
    无特殊需求无需配置,默认值为1。netty 的 ioThreads 负责网络 IO 操作,如果业务流量较大,可以调高 ioThreads 个数;
  2. listenersThreads
    默认值为1,建议值8(默认partition数量为4,根据client中新建consumer数量决定)。该client下所有消费者共享,负责调用以 listener 模式启动的消费者的回调函数,建议配置大于该 client 负责的所有 partition 数目,否则会存在线程竞争导致消息吞吐下降;
  3. perationTimeout
    无特殊需求无需配置,默认值为30s。元数据操作的超时时间。
  4. connectionTimeout
    无特殊需求无需配置,默认值为10s。连接 Pulsar 的超时时间。

producer初始化

Producer<byte[]> producer = client.newProducer()
                .topic(topicName)
                .maxPendingMessages(500)
                .blockIfQueueFull(true)
                .enableBatching(true)
                .compressionType(CompressionType.LZ4)
                .producerName(producerName)
                .create();
  1. maxPendingMessages
    默认值1000,建议值500。生产者消息发送队列,根据实际 topic 的量级合理配置,避免在网络中断、Pulsar 故障场景下的 OOM。
  2. blockIfQueueFull
    默认false。在maxPendingMessages达到之后,producer的Send()以及SendAsync()是阻塞住还是抛出异常,true为阻塞,false为抛出异常。
  3. messageRoutingMode
    消息路由模式。无顺序消息要求无需配置。默认为 RoundRobinPartition。根据业务需求选择,如果需要保序,一般选择 SinglePartition,把相同 key 的消息发到同一个 partition。
  4. enableBatching
    是否开启批发送,默认为true。如果该 topic 上消息数较小,则不建议开启 batch。需要注意的是enableBatching只在异步发送sendAsync生效,同步发送send失效。因此建议生产环境若想使用批处理,则需使用异步发送,或者多线程同步发送。
  5. compressionType
    数据压缩类型,默认不压缩。可选择LZ4,ZLIB,ZSTD,SNAPPY。consumer端不用做改动就能消费,如果需要开启数据压缩,建议设置为LZ4,开启后大约可以降低3/4带宽消耗和存储(官方测试)。

consumer初始化

// 如果消费业务中存在耗时操作,建议放到用户线程池中执行
MessageListener myMessageListener = (consumer, msg) -> {
            try {
                System.out.println("Message received: " + new String(msg.getData()));
                consumer.acknowledge(msg);
            } catch (Exception e) {
                consumer.negativeAcknowledge(msg);
            }
        };
Consumer<byte[]> consumerCreated = pulsarClient.newConsumer()
                .topic(topicName)
                .subscriptionName(subscriptionName)
                .subscriptionType(SubscriptionType.Shared)
                .subscriptionInitialPosition(SubscriptionInitialPosition.Latest)
                .receiverQueueSize(100)
                .negativeAckRedeliveryDelay(60, TimeUnit.SECONDS)
                .messageListener(myMessageListener)
                .subscribe();
  1. receiverQueueSize
    consumer的接收队列长度,默认值1000,建议值100。消费端处理不过来时,消费缓冲队列会积压在内存中,增加该参数可以提高consumer的吞吐能力,但是相应的会消耗更多的内存,并且会存在消息消费不均衡的情况。需要合理配置防止OOM。
  2. subscribeType
    订阅类型,根据业务需求决定,默认为Exclusive。无特殊需求,建议设置为Shared模式。
  3. subscriptionInitialPosition
    默认配置为Latest,测试、灰度以及生产环境sub均需提前创建,默认从Latest开始订阅。
  4. messageListener
  5. 使用 listener 模式消费,只需要提供回调函数,不需要主动执行 receive() 拉取。一般没无特殊诉求,建议采用 listener 模式。线程数为client初始化中的listenersThreads。
  6. ackTimeout
    默认值为0。需要特别注意的是,使用默认值的时候会一直等待当前 consumer 对该条消息的 ack 或者 nack;如果一直没有 ack 或者 nack,只有在当前 consumer 下线之后才会推送给其他的 consumer,这边需要特别注意,因为如果当前 consumer 的 receiverQueue 满了则会阻塞当前 consumer 的消费。配置 ackTimeout > 0 后,当服务端推送消息但消费者未及时回复 ack 时,经过 ackTimeout 后,会重新推送给消费者处理,即 redeliver 机制。注意在利用 redeliver 机制的时候,一定要注意仅仅使用重试机制来重试可恢复的错误。举个例子,如果代码里面对消息进行解码,解码失败就不适合利用 redeliver 机制。这会导致客户端一直处于重试之中。如果拿捏不准,还可以通过下面的 deadLetterPolicy 配置死信队列,防止消息一直重试。
  7. negativeAckRedeliveryDelay
    默认60s。当客户端调用 negativeAcknowledge 时,触发 redeliver 机制的时间,默认为立刻redeliver。 redeliver 机制的注意点同 ackTimeout。需要注意的是, ackTimeout 和 negativeAckRedeliveryDelay 建议不要同时使用,一般建议使用 negativeAck,用户可以有更灵活的控制权。一旦 ackTimeout 配置的不合理,在消费时间不确定的情况下可能会导致消息不必要的重试。
  8. deadLetterPolicy
    配置 redeliver 的最大次数和死信 topic。
 类似资料: