Pulsar从指定位置时间段开始消费

郜彦
2023-12-01

Pulsar Client API

从5个小时之前开始消费。

public class SeekConsumer {
    public static String adminUrl = "http://broker1:8080";
    public static String serviceUrl = "pulsar://broker2:6650";

    public static void main(String[] args) {
        try {
            PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();

            Consumer<String> consumer = client.newConsumer(Schema.STRING)
                    .topic("persistent://public/default/test-reset-cursor")
                    .subscriptionName("seek test")
                    .subscriptionInitialPosition(SubscriptionInitialPosition.Latest)
                    .subscribe();

            // seek consumer to 5 hours ago
            consumer.seek(Instant.now().minus(Duration.ofHours(5)).toEpochMilli());

            while (true) {
                final Message<String> msg = consumer.receive();

                System.out.printf(
                        "Message received: key=%s, value=%s, topic=%s, id=%s%n",
                        msg.getKey(),
                        msg.getValue(),
                        msg.getTopicName(),
                        msg.getMessageId().toString());
                consumer.acknowledge(msg);
            }

        } catch (PulsarClientException e) {
            e.printStackTrace();
        }
    }
}

命令行

$ ./bin/pulsar-admin topics stats persistent://public/default/test-reset-cursor
$ ./bin/pulsar-admin topics reset-cursor -s "flink-pulsar-e8a0f8aa-7c86-4ab2-81a3-48b2e296c490" -t 1d persistent://public/default/test-reset-cursor

应用场景

在pulsar-flink-connector中指定消费者的位置,为某个特定时间点之后的位置,而不是Latest或者Earliest。

方案一

使用已存的且无用的消费者,重置该消费者的cursor。

# 首先查看该topic的stats
$ ./bin/pulsar-admin topics stats persistent://public/default/test-reset-cursor
# 节选
"flink-pulsar-e8a0f8aa-7c86-4ab2-81a3-48b2e296c490" : {
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "bytesOutCounter" : 0,
      "msgOutCounter" : 0,
      "msgRateRedeliver" : 0.0,
      "chuckedMessageRate" : 0,
      "msgBacklog" : 0,
      "msgBacklogNoDelayed" : 0,
      "blockedSubscriptionOnUnackedMsgs" : false,
      "msgDelayed" : 0,
      "unackedMessages" : 0,
      "msgRateExpired" : 0.3333234752648865,
      "lastExpireTimestamp" : 1631706111555,
      "lastConsumedFlowTimestamp" : 0,
      "lastConsumedTimestamp" : 0,
      "lastAckedTimestamp" : 0,
      "consumers" : [ ],
      "isDurable" : true,
      "isReplicated" : false,
      "consumersAfterMarkDeletePosition" : { }
    },


# 找到其中的一个订阅,最好是msgBacklog为0的消费者,如果没有,则新创建一个从lastest订阅的消费者,以下步骤忽略,直接使用方案二
# 以下将该消费者的cursor提前至1天
$ ./bin/pulsar-admin topics reset-cursor -s "flink-pulsar-e8a0f8aa-7c86-4ab2-81a3-48b2e296c490" -t 1d persistent://public/default/test-reset-cursor
# 再次查看即可发现该订阅的cursor被重置了
"flink-pulsar-e8a0f8aa-7c86-4ab2-81a3-48b2e296c490" : {
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "bytesOutCounter" : 0,
      "msgOutCounter" : 0,
      "msgRateRedeliver" : 0.0,
      "chuckedMessageRate" : 0,
      "msgBacklog" : 200,
      "msgBacklogNoDelayed" : 200,
      "blockedSubscriptionOnUnackedMsgs" : false,
      "msgDelayed" : 0,
      "unackedMessages" : 0,
      "msgRateExpired" : 0.3333234752648865,
      "lastExpireTimestamp" : 1631706111555,
      "lastConsumedFlowTimestamp" : 0,
      "lastConsumedTimestamp" : 0,
      "lastAckedTimestamp" : 0,
      "consumers" : [ ],
      "isDurable" : true,
      "isReplicated" : false,
      "consumersAfterMarkDeletePosition" : { }
    },

方案二

新建消费者并重置消费者的cursor

# 创建一个消费者,默认消费位置latest
$ ./bin/pulsar-client consume persistent://public/default/test-reset-cursor -s "test" -n 10
"test" : {
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "bytesOutCounter" : 0,
      "msgOutCounter" : 0,
      "msgRateRedeliver" : 0.0,
      "chuckedMessageRate" : 0,
      "msgBacklog" : 0,
      "msgBacklogNoDelayed" : 0,
      "blockedSubscriptionOnUnackedMsgs" : false,
      "msgDelayed" : 0,
      "unackedMessages" : 0,
      "type" : "Exclusive",
      "msgRateExpired" : 0.0,
      "lastExpireTimestamp" : 0,
      "lastConsumedFlowTimestamp" : 1631707344242,
      "lastConsumedTimestamp" : 0,
      "lastAckedTimestamp" : 0,
      "consumers" : [ ],
      "isDurable" : true,
      "isReplicated" : false,
      "consumersAfterMarkDeletePosition" : { }
    },

# 重置消费位置为12h之前
$ ./bin/pulsar-admin topics reset-cursor -s "test" -t 12h persistent://public/default/test-reset-cursor
$ ./bin/pulsar-admin topics stats persistent://public/default/test-reset-cursor
"test" : {
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "bytesOutCounter" : 0,
      "msgOutCounter" : 0,
      "msgRateRedeliver" : 0.0,
      "chuckedMessageRate" : 0,
      "msgBacklog" : 200,
      "msgBacklogNoDelayed" : 200,
      "blockedSubscriptionOnUnackedMsgs" : false,
      "msgDelayed" : 0,
      "unackedMessages" : 0,
      "type" : "Exclusive",
      "msgRateExpired" : 0.0,
      "lastExpireTimestamp" : 0,
      "lastConsumedFlowTimestamp" : 1631707344242,
      "lastConsumedTimestamp" : 0,
      "lastAckedTimestamp" : 0,
      "consumers" : [ ],
      "isDurable" : true,
      "isReplicated" : false,
      "consumersAfterMarkDeletePosition" : { }
    },
    
$ ./bin/pulsar-admin topics stats-internal persistent://public/default/test-reset-cursor
"test" : {
      "markDeletePosition" : "849:-1",
      "readPosition" : "849:0",
      "waitingReadOp" : false,
      "pendingReadOps" : 0,
      "messagesConsumedCounter" : 0,
      "cursorLedger" : 886,
      "cursorLedgerLastEntry" : 3,
      "individuallyDeletedMessages" : "[]",
      "lastLedgerSwitchTimestamp" : "2021-09-15T20:02:24.224+08:00",
      "state" : "Open",
      "numberOfEntriesSinceFirstNotAckedMessage" : 1,
      "totalNonContiguousDeletedMessagesRange" : 0,
      "properties" : { }
    }

方案一和方案二涉及代码部分

在代码中标明需要重置指标的位置即可。

   public static void main(String[] args) {
        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties props = new Properties();
        props.setProperty("topic", "persistent://public/default/test-reset-cursor");
        props.setProperty("partition.discovery.interval-millis", "5000");

        FlinkPulsarSource<String> source = new FlinkPulsarSource<>(
                servericeUrl,
                adminUrl,
                new SimpleStringSchema(),
                props
        );

       // source.setStartFromSubscription("test"); # 改为自己重置指标的位置
        source.setStartFromSubscription("flink-pulsar-e8a0f8aa-7c86-4ab2-81a3-48b2e296c490"); # 改为自己重置指标的位置即可
//        source.setStartFromEarliest();

        DataStream<String> sourceStream = see.addSource(source);

        sourceStream.print();

        try {
            see.execute("test-pulsar-source-sink");
        }catch (Exception e) {
            e.printStackTrace();
        }
    }  
 类似资料: