Consumer 向 broker 发送消息流获取申请(flow permit request)以获取消息。 在 Consumer 端有一个队列,用于接收从 broker 推送来的消息。 你能够通过receiverQueueSize参数配置队列的长度 (队列的默认长度是1000) 每当 consumer.receive() 被调用一次,就从缓冲区(buffer)获取一条消息。
可以通过同步(sync) 或者异步(async)的方式从brokers接受消息。
发送模式 | 说明 |
---|---|
同步接收 | 同步模式,在收到消息之前都是被阻塞的。 |
异步接收 | 异步接收模式会立即返回一个 future 值(如 Java 中的 CompletableFuture),一旦收到新的消息就立刻完成。 |
客户端库为使用者提供侦听器实现。例如,Java客户端提供了一个MessageListener接口。在这个接口中,一旦接受到新的消息,received方法将被调用。
消费者在成功消费一个消息后,向 Broker 发送一个确认请求。 然后,这条被消费的消息将被永久保存,只有在所有订阅者都确认后才会被删除。 如果希望消息被消费者确认后仍然保留下来,可配置消息保留策略实现。
对于批处理消息,你可以启用批处理索引确认,以避免将确认的消息分派给消费者。 关于批量索引确认的细节,请参见batching。
消息可以通过以下两种方式之一进行确认。
如果你想单独确认消息,你可以使用以下API。
consumer.acknowledge(msg);
如果你想累计确认消息,你可以使用以下API。
consumer.acknowledgeCumulative(msg);
Note
在共享订阅模式中不能使用累积确认,因为共享订阅模式涉及多个可以访问同一订阅的使用者。在共享订阅模式下,消息会被单独确认。
当一个消费者未能消费一个消息并打算再次消费它时,这个消费者应该向 Broker 发送一个否定的确认。 然后,Broker 将把这个消息重新传递给消费者。
根据消费订阅模式,单独或累积地对消息进行负面确认。
在独占和故障转移订阅模式下,使用者只会消极地确认他们收到的最后一条消息。
在共享和密钥共享订阅模式中,消费者可以单独否定地确认消息。
请注意,订单订阅类型为否定, 比如Exclusive,Failover和Key_Shared之类的消息可能会导致发送失败的消息以不符合原始顺序的方式到达使用者。
如果你想否定地确认信息,你可以使用以下API。
//调用这个api,消息会被否定地确认。
consumer.negativeAcknowledge(msg);
Note
如果启用了批处理,则一批中的所有消息都将重新传递给使用者。
如果一个消息没有被成功消费,而你想让 Broker 自动重新交付这个消息,那么你可以为未被认可的消息启用自动重新交付机制。 在启用自动重新交付的情况下,客户端跟踪整个acktimeout时间范围内的未确认的消息,并在指定确认超时时向代理发送重新交付未确认的消息请求。
Note
如果启用了批处理,则一批中的所有消息都将重新传递给使用者。
否定确认优于确认超时,因为否定确认更精确地控制单个消息的重新传递,并在消息处理时间超过确认超时时避免无效的重新传递。
当消费者无法成功消费某些messages时,死信主题使您能够消费新messages。在这种机制中,无法使用的消息存储在单独的主题中,称为死信主题。您可以决定如何处理死信主题中的消息。
以下示例显示了如何使用默认死信主题在Java客户端中启用死信topic:
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount)
.build())
.subscribe();
默认死信主题使用以下格式:
<topicname>-<subscriptionname>-DLQ
如果要指定死信主题的名称,请使用以下Java客户端示例:
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount)
.deadLetterTopic("your-topic-name")
.build())
.subscribe();
死信主题依赖消息重试由于确认超时或否定确认,消息被重新传递。如果要对消息使用否定确认,请确保在确认超时之前对其进行否定确认。
Note
目前,死信主题在共享和密钥共享订阅模式下启用。
很多在线的业务系统,由于业务逻辑处理出现异常,消息一般需要被重新消费。 若需要允许延时重新消费失败的消息,你可以配置生产者同时发送消息到业务主题和重试主题,并允许消费者自动重试消费。 配置了允许消费者自动重试。如果消息没有被消费成功,它将被保存到重试主题当中。并在指定延时时间后,自动重新消费重试主题里面的消费失败消息。
默认情况下,自动重试被禁用。您可以将enableRetry设置为true,以启用对使用者的自动重试。
如下例子所示,消费者会从重试主题消费消息。
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.enableRetry(true)
.receiverQueueSize(100)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount)
.retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry")
.build())
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
如果您想要将消息放入重试队列,您可以使用以下API。
consumer.reconsumeLater(msg,3,TimeUnit.SECONDS);