pulsar 的批量发送与rocketmq不同,本质是client 进行缓存,根据producer创建参数自行进行控制,没有显示batch发送操作,示意代码如下:
Producer<byte[]> BatchProducer = client.newProducer().topic("mybatch-topic").batchingMaxMessages(10)
.batchingMaxPublishDelay(10, TimeUnit.MINUTES).sendTimeout(10, TimeUnit.SECONDS).blockIfQueueFull(true)
.enableBatching(true).create();
// BatchProducer.
for (int i = 0; i < 25; i++) {
// BatchProducer.newMessage().value();
TypedMessageBuilder<byte[]> w = BatchProducer.newMessage().key("key" + i).value(("value" + i).getBytes())
.property("user-defined-property", "value");
w.sendAsync().thenAccept(messageId -> {
System.out.println("Published batch message: " + messageId);
}).exceptionally(ex -> {
System.err.println("Failed to publish: " + ex);
return null;
});
;
}
//FLUSH will send all remained message to broker
BatchProducer.flush();
要注意几点
1、设置合理批量发送数量及最大间隔发送时间(batchingMaxPublishDelay)
2、适当考虑调用flush,以保障未达到batchingMaxMessages还在队列中的消息都发送给broker
具体批量发送的逻辑可以看ProducerImpl类,多个函数都有批量考虑,如private void serializeAndSendMessage。
实际批量发送函数为 doBatchSendAndAdd(msg, callback, payload)
事务消息不能与批量同时使用,事务有超时控制机制,创建代码如下:
Transaction txn = client.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
如果使用了batch模式并设置了debug,可能有如下信息不断输出
2021-11-23 16:08:01.791 INFO 1040 --- [r-client-io-1-1] o.a.pulsar.client.impl.ProducerImpl : [mybatch-topic] [standalone-0-6] Batching the messages from the batch container with 0 messages
同时消息不能设置超时时间,如果设置了可能有如下错误:
Only producers disabled sendTimeout are allowed to produce transactional
事务消息可以混杂立即发送和延迟发送消息,完整的发送事务代码如下:
// BatchProducer.
Transaction txn = client.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
// Only producers disabled sendTimeout are allowed to produce transactional
// messages
for (int i = 0; i < 25; i++) {
// BatchProducer.newMessage().value();
TypedMessageBuilder<byte[]> w ;
if (i %2 ==0) {
w=BatchProducer.newMessage(txn).key("key" + i).value(("value" + i).getBytes())
.property("user-defined-property", "value");
}else {
w=BatchProducer.newMessage(txn).deliverAfter(100, TimeUnit.SECONDS).key("key" + i).value(("value" + i).getBytes())
.property("user-defined-property", "value");
}
w.sendAsync().thenAccept(messageId -> {
System.out.println("Published batch message: " + messageId);
}).exceptionally(ex -> {
System.err.println("Failed to publish: " + ex);
return null;
});
}
}
// BatchProducer.flush();
txn.commit();
pulsar消费也支持事务模式,官方文档给的示意是可以完成多个topic的消费后提交,以下是一个简单批量消费后一起提交的示意代码
Consumer consumer = client.newConsumer().topic("mybatch-topic").subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Key_Shared).batchReceivePolicy(BatchReceivePolicy.builder()
.maxNumMessages(5).maxNumBytes(1024 * 1024).timeout(200, TimeUnit.MILLISECONDS).build())
.subscribe();
//Messages m = consumer.batchReceive();
int i=0;
Transaction txn = client.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
while (true) {
// Wait for a message
Message msg = consumer.receive();
try {
// Do something with the message
System.out.println("Message received: " + new String(msg.getData()));
// Acknowledge the message so that it can be deleted by the message broker
consumer.acknowledgeCumulativeAsync(msg.getMessageId(),txn);
} catch (Exception e) {
// Message failed to process, redeliver later
consumer.negativeAcknowledge(msg);
}
i++;
if (i>=5) {
txn.commit();
txn = client.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
i=0;
}
}