从5个小时之前开始消费。
public class SeekConsumer {
public static String adminUrl = "http://broker1:8080";
public static String serviceUrl = "pulsar://broker2:6650";
public static void main(String[] args) {
try {
PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("persistent://public/default/test-reset-cursor")
.subscriptionName("seek test")
.subscriptionInitialPosition(SubscriptionInitialPosition.Latest)
.subscribe();
// seek consumer to 5 hours ago
consumer.seek(Instant.now().minus(Duration.ofHours(5)).toEpochMilli());
while (true) {
final Message<String> msg = consumer.receive();
System.out.printf(
"Message received: key=%s, value=%s, topic=%s, id=%s%n",
msg.getKey(),
msg.getValue(),
msg.getTopicName(),
msg.getMessageId().toString());
consumer.acknowledge(msg);
}
} catch (PulsarClientException e) {
e.printStackTrace();
}
}
}
$ ./bin/pulsar-admin topics stats persistent://public/default/test-reset-cursor
$ ./bin/pulsar-admin topics reset-cursor -s "flink-pulsar-e8a0f8aa-7c86-4ab2-81a3-48b2e296c490" -t 1d persistent://public/default/test-reset-cursor
在pulsar-flink-connector中指定消费者的位置,为某个特定时间点之后的位置,而不是Latest或者Earliest。
使用已存的且无用的消费者,重置该消费者的cursor。
# 首先查看该topic的stats
$ ./bin/pulsar-admin topics stats persistent://public/default/test-reset-cursor
# 节选
"flink-pulsar-e8a0f8aa-7c86-4ab2-81a3-48b2e296c490" : {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 0,
"msgOutCounter" : 0,
"msgRateRedeliver" : 0.0,
"chuckedMessageRate" : 0,
"msgBacklog" : 0,
"msgBacklogNoDelayed" : 0,
"blockedSubscriptionOnUnackedMsgs" : false,
"msgDelayed" : 0,
"unackedMessages" : 0,
"msgRateExpired" : 0.3333234752648865,
"lastExpireTimestamp" : 1631706111555,
"lastConsumedFlowTimestamp" : 0,
"lastConsumedTimestamp" : 0,
"lastAckedTimestamp" : 0,
"consumers" : [ ],
"isDurable" : true,
"isReplicated" : false,
"consumersAfterMarkDeletePosition" : { }
},
# 找到其中的一个订阅,最好是msgBacklog为0的消费者,如果没有,则新创建一个从lastest订阅的消费者,以下步骤忽略,直接使用方案二
# 以下将该消费者的cursor提前至1天
$ ./bin/pulsar-admin topics reset-cursor -s "flink-pulsar-e8a0f8aa-7c86-4ab2-81a3-48b2e296c490" -t 1d persistent://public/default/test-reset-cursor
# 再次查看即可发现该订阅的cursor被重置了
"flink-pulsar-e8a0f8aa-7c86-4ab2-81a3-48b2e296c490" : {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 0,
"msgOutCounter" : 0,
"msgRateRedeliver" : 0.0,
"chuckedMessageRate" : 0,
"msgBacklog" : 200,
"msgBacklogNoDelayed" : 200,
"blockedSubscriptionOnUnackedMsgs" : false,
"msgDelayed" : 0,
"unackedMessages" : 0,
"msgRateExpired" : 0.3333234752648865,
"lastExpireTimestamp" : 1631706111555,
"lastConsumedFlowTimestamp" : 0,
"lastConsumedTimestamp" : 0,
"lastAckedTimestamp" : 0,
"consumers" : [ ],
"isDurable" : true,
"isReplicated" : false,
"consumersAfterMarkDeletePosition" : { }
},
新建消费者并重置消费者的cursor
# 创建一个消费者,默认消费位置latest
$ ./bin/pulsar-client consume persistent://public/default/test-reset-cursor -s "test" -n 10
"test" : {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 0,
"msgOutCounter" : 0,
"msgRateRedeliver" : 0.0,
"chuckedMessageRate" : 0,
"msgBacklog" : 0,
"msgBacklogNoDelayed" : 0,
"blockedSubscriptionOnUnackedMsgs" : false,
"msgDelayed" : 0,
"unackedMessages" : 0,
"type" : "Exclusive",
"msgRateExpired" : 0.0,
"lastExpireTimestamp" : 0,
"lastConsumedFlowTimestamp" : 1631707344242,
"lastConsumedTimestamp" : 0,
"lastAckedTimestamp" : 0,
"consumers" : [ ],
"isDurable" : true,
"isReplicated" : false,
"consumersAfterMarkDeletePosition" : { }
},
# 重置消费位置为12h之前
$ ./bin/pulsar-admin topics reset-cursor -s "test" -t 12h persistent://public/default/test-reset-cursor
$ ./bin/pulsar-admin topics stats persistent://public/default/test-reset-cursor
"test" : {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 0,
"msgOutCounter" : 0,
"msgRateRedeliver" : 0.0,
"chuckedMessageRate" : 0,
"msgBacklog" : 200,
"msgBacklogNoDelayed" : 200,
"blockedSubscriptionOnUnackedMsgs" : false,
"msgDelayed" : 0,
"unackedMessages" : 0,
"type" : "Exclusive",
"msgRateExpired" : 0.0,
"lastExpireTimestamp" : 0,
"lastConsumedFlowTimestamp" : 1631707344242,
"lastConsumedTimestamp" : 0,
"lastAckedTimestamp" : 0,
"consumers" : [ ],
"isDurable" : true,
"isReplicated" : false,
"consumersAfterMarkDeletePosition" : { }
},
$ ./bin/pulsar-admin topics stats-internal persistent://public/default/test-reset-cursor
"test" : {
"markDeletePosition" : "849:-1",
"readPosition" : "849:0",
"waitingReadOp" : false,
"pendingReadOps" : 0,
"messagesConsumedCounter" : 0,
"cursorLedger" : 886,
"cursorLedgerLastEntry" : 3,
"individuallyDeletedMessages" : "[]",
"lastLedgerSwitchTimestamp" : "2021-09-15T20:02:24.224+08:00",
"state" : "Open",
"numberOfEntriesSinceFirstNotAckedMessage" : 1,
"totalNonContiguousDeletedMessagesRange" : 0,
"properties" : { }
}
在代码中标明需要重置指标的位置即可。
public static void main(String[] args) {
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.setProperty("topic", "persistent://public/default/test-reset-cursor");
props.setProperty("partition.discovery.interval-millis", "5000");
FlinkPulsarSource<String> source = new FlinkPulsarSource<>(
servericeUrl,
adminUrl,
new SimpleStringSchema(),
props
);
// source.setStartFromSubscription("test"); # 改为自己重置指标的位置
source.setStartFromSubscription("flink-pulsar-e8a0f8aa-7c86-4ab2-81a3-48b2e296c490"); # 改为自己重置指标的位置即可
// source.setStartFromEarliest();
DataStream<String> sourceStream = see.addSource(source);
sourceStream.print();
try {
see.execute("test-pulsar-source-sink");
}catch (Exception e) {
e.printStackTrace();
}
}