当前位置: 首页 > 知识库问答 >
问题:

保存ChronicleQueue的消费者/裁剪者读取偏移量

嵇丰
2023-03-14

我正在探索ChronicleQueue来保存我的一个应用程序中生成的事件。我想在经过一些处理后,将保存的事件按其原始发生顺序发布到不同的系统。我有多个应用程序实例,每个实例都可以运行一个单线程appender来将事件添加到ChronicleQueue。尽管跨实例排序是必要的,但我想理解以下两个问题。


String file = "myPath";
try (ChronicleQueue cq = SingleChronicleQueueBuilder.binary(file).build()) {
  for(int i = 0 ;i<10;i++){
    cq.acquireAppender().writeText("test"+i);
  }
}


try (ChronicleQueue cq = SingleChronicleQueueBuilder.binary(file).build()) {
    ExcerptTailer atailer = cq.createTailer("a");
    System.out.println(atailer.readText());
    System.out.println(atailer.readText());
    System.out.println(atailer.readText()); 
}

2)还需要一些建议,如果有一种方法来保持跨实例的事件顺序。

共有1个答案

薛经艺
2023-03-14

使用命名的tailer应该确保tailer只读取一条消息一次。如果你有一个不发生这种情况的例子,你能创建一个测试来重现它吗?

写入时队列中条目的顺序是固定的,所有的tailer都以相同的顺序看到相同的消息,没有任何选择。

 类似资料:
  • 如有任何帮助,我们将不胜感激。

  • 我的用例是使用kafka消费者api,这样我们就可以从kafka主题中手动读取最后一次成功处理的数据的偏移量,然后手动确认Kafka的成功处理数据。(这是为了减少数据丢失)。然而,在我当前的实现中,程序向前移动并从下一个偏移读取,即使我注释掉了“ack.acknowledge()”。我是新来的Kafka和实现我的消费者下面的方式(我们使用Spring引导) 问题是:即使我注释掉ack.acknow

  • 试图理解消费者补偿和消费者群体补偿之间的关系。 下面的堆栈溢出链接提供了对消费群体补偿管理的极好理解<什么决定Kafka消费补偿?现在问题来了, 情节: 我们在一个消费者组组1中有消费者(c1)。 偏移值是否将存储在消费者(c1)和组(group1)两个级别?或者如果消费者属于任何消费者组,偏移量将存储在仅消费者组级别? 如果偏移值将存储在两个级别中,它是否是消费者级别偏移值将覆盖消费者组级别偏移

  • 我有多个Kafka消费者和制作人,主题不同。使用独立应用程序,我想监控Kafka消费者的延迟。 我使用Kafka0.10.0.1,因为Kafka现在存储消费者偏移Kafka本身,所以我怎么能读到相同的。 我能够读取每个分区的主题偏移量。

  • 我已经将enable.auto.commit设置为true,并将auto.commit.interval.ms设置为10,000(即10秒)。现在我的问题是--消费者是每个记录的提交偏移量,还是根据10秒内消耗的记录数提交并提前偏移量?

  • 我正在使用Kafka2.0版和java消费者API来消费来自一个主题的消息。我们使用的是一个单节点Kafka服务器,每个分区有一个使用者。我注意到消费者正在丢失一些消息。场景是:消费者投票主题。我为每个线程创建了一个消费者。获取消息并将其交给处理程序来处理消息。然后使用“至少一次”的Kafka消费者语义来提交Kafka偏移量来提交偏移量。同时,我有另一个消费者使用不同的group-id运行。在这个