pulsar 批量及事务消息

胡和煦
2023-12-01

批量

pulsar 的批量发送与rocketmq不同,本质是client 进行缓存,根据producer创建参数自行进行控制,没有显示batch发送操作,示意代码如下:

Producer<byte[]> BatchProducer = client.newProducer().topic("mybatch-topic").batchingMaxMessages(10)
				.batchingMaxPublishDelay(10, TimeUnit.MINUTES).sendTimeout(10, TimeUnit.SECONDS).blockIfQueueFull(true)
				.enableBatching(true).create();

		// BatchProducer.

		for (int i = 0; i < 25; i++) {
			// BatchProducer.newMessage().value();
			TypedMessageBuilder<byte[]> w = BatchProducer.newMessage().key("key" + i).value(("value" + i).getBytes())
					.property("user-defined-property", "value");

			w.sendAsync().thenAccept(messageId -> {
				System.out.println("Published batch message: " + messageId);
			}).exceptionally(ex -> {
				System.err.println("Failed to publish: " + ex);
				return null;
			});
			;
		}
		//FLUSH will send all remained message to broker
		BatchProducer.flush();

要注意几点
1、设置合理批量发送数量及最大间隔发送时间(batchingMaxPublishDelay)
2、适当考虑调用flush,以保障未达到batchingMaxMessages还在队列中的消息都发送给broker

具体批量发送的逻辑可以看ProducerImpl类,多个函数都有批量考虑,如private void serializeAndSendMessage。
实际批量发送函数为 doBatchSendAndAdd(msg, callback, payload)

事务消息

事务消息不能与批量同时使用,事务有超时控制机制,创建代码如下:

Transaction txn = client.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES).build().get();

如果使用了batch模式并设置了debug,可能有如下信息不断输出

2021-11-23 16:08:01.791  INFO 1040 --- [r-client-io-1-1] o.a.pulsar.client.impl.ProducerImpl      : [mybatch-topic] [standalone-0-6] Batching the messages from the batch container with 0 messages

同时消息不能设置超时时间,如果设置了可能有如下错误:

Only producers disabled sendTimeout are allowed to produce transactional

事务消息可以混杂立即发送和延迟发送消息,完整的发送事务代码如下:

// BatchProducer.
		Transaction txn = client.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
		// Only producers disabled sendTimeout are allowed to produce transactional
		// messages
		for (int i = 0; i < 25; i++) {
			// BatchProducer.newMessage().value();
			TypedMessageBuilder<byte[]> w ;
			if (i %2 ==0) {
			w=BatchProducer.newMessage(txn).key("key" + i).value(("value" + i).getBytes())
					.property("user-defined-property", "value");
			}else {
				w=BatchProducer.newMessage(txn).deliverAfter(100, TimeUnit.SECONDS).key("key" + i).value(("value" + i).getBytes())
						.property("user-defined-property", "value");
			}
			w.sendAsync().thenAccept(messageId -> {
				System.out.println("Published batch message: " + messageId);
			}).exceptionally(ex -> {
				System.err.println("Failed to publish: " + ex);
				return null;
			});
			 
		}
		}
		// BatchProducer.flush();
		txn.commit();

消费侧事务

pulsar消费也支持事务模式,官方文档给的示意是可以完成多个topic的消费后提交,以下是一个简单批量消费后一起提交的示意代码

Consumer consumer = client.newConsumer().topic("mybatch-topic").subscriptionName("my-subscription")
				.subscriptionType(SubscriptionType.Key_Shared).batchReceivePolicy(BatchReceivePolicy.builder()
						.maxNumMessages(5).maxNumBytes(1024 * 1024).timeout(200, TimeUnit.MILLISECONDS).build())
				.subscribe();

		//Messages m = consumer.batchReceive();
		int i=0;
		Transaction txn = client.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES).build().get();

		while (true) {
      	  // Wait for a message
      	  Message msg = consumer.receive();

      	  try {
      	      // Do something with the message
      	      System.out.println("Message received: " + new String(msg.getData()));

      	      // Acknowledge the message so that it can be deleted by the message broker
      	      consumer.acknowledgeCumulativeAsync(msg.getMessageId(),txn);
      	  } catch (Exception e) {
      	      // Message failed to process, redeliver later
      	      consumer.negativeAcknowledge(msg);
      	  }
      	  i++;
      	  if (i>=5) {
      		  txn.commit();
      		  txn = client.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES).build().get();

      		  i=0;
      	  }
      	  
      	}
 类似资料: