当前位置: 首页 > 知识库问答 >
问题:

Camel kafka与Spring FileStateRepository

郗浩言
2023-03-14

我试图将Kafka偏移量保存到文件中,我使用Spring Boot,似乎偏移量在文件中写入,但没有读取,所以事实上骆驼将在重新启动时从Kafka主题的开头开始读取

@Component
public class Route extends RouteBuilder {
  @Override
  public void configure() throws Exception {
    from(kafka())
            .to("log:TEST?level=INFO")
            .process(Route::commitKafka);
}

private String kafka() {

    String kafkaEndpoint = "kafka:";

    kafkaEndpoint += "topic";
    kafkaEndpoint += "?brokers=";
    kafkaEndpoint += "localhost:9092";
    kafkaEndpoint += "&groupId=";
    kafkaEndpoint += "TEST";
    kafkaEndpoint += "&autoOffsetReset=";
    kafkaEndpoint += "earliest";
    kafkaEndpoint += "&autoCommitEnable=";
    kafkaEndpoint += false;
    kafkaEndpoint += "&allowManualCommit=";
    kafkaEndpoint += true;
    kafkaEndpoint += "&offsetRepository=";
    kafkaEndpoint += "#fileStore";

    return kafkaEndpoint;
}

@Bean(name = "fileStore")
private FileStateRepository fileStateRepository() {
    FileStateRepository fileStateRepository = FileStateRepository.fileStateRepository(new File("/kafka/offset_repo/repo.dat"));
     // This will be empty
     // System.out.println(fileStateRepository.getCache());
    return fileStateRepository;
}

private static void commitKafka(Exchange exchange) {
      KafkaManualCommit manual = exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
      manual.commitSync();
  }
}

共有1个答案

鲍建业
2023-03-14

我终于找到了一个解决方案,但它没有出现在文档中,必须调用start方法才能在启动时初始化repo

@Bean(name = "fileStore")
private FileStateRepository fileStateRepository() {
    FileStateRepository fileStateRepository = FileStateRepository.fileStateRepository(new File("/kafka/offset_repo/repo.dat"));

    try {
        fileStateRepository.start();
    } catch (Exception e) {
        e.printStackTrace();
    }


    return fileStateRepository;
}
 类似资料:
  • 在C语言中,假设每个算法被赋予完全相同的一组进程,那么先到先得、最短作业优先和循环之间的周转时间是否相等?还是调度算法不同?

  • 问题内容: 为了为 HTML5 Doctype 定义字符集,我应该使用哪种表示法? 短: 长: 问题答案: 在HTML5中,它们是等效的。使用较短的一个,更容易记住和键入。浏览器支持很好,因为它是为向后兼容而设计的。

  • 连接的多个输入都相当于Yes的时候才会输出Yes。 用法 Your browser does not support the video tag. 案例:小闹钟 功能:今天15:10:00,响起猫叫声小闹钟 工作原理 当所有的输入都是Yes的时候,与节点才输出Yes。

  • 问题内容: 似乎有三种 相同的 方法可以独立于平台获取依赖于平台的“文件分隔符”: 我们如何决定何时使用哪个? 它们之间甚至有什么区别吗? 问题答案: 可以通过调用命令行参数或使用命令行参数覆盖 获取默认文件系统的分隔符。 获取默认文件系统。 获取文件系统的分隔符。请注意,作为一种实例方法,在需要代码在一个JVM中对多个文件系统进行操作的情况下,可以使用该方法将不同的文件系统传递给代码(而不是默认

  • 问题内容: 我今天刚刚与一些同事讨论了python的db-api fetchone vs fetchmany vs fetchall。 我确定每个应用程序的用例都取决于我正在使用的db-api的实现,但是总的来说,fetchone,fetchmany,fetchall的用例是什么? 换句话说,以下等效项是什么?还是其中之一比其他人更受青睐?如果是这样,在哪些情况下? 问题答案: 我认为这确实取决于

  • 问题内容: 即时创建元素并能够移动元素的最佳方法是什么?例如,假设我要创建一个矩形,圆形和多边形,然后选择这些对象并四处移动。 我了解HTML5提供了三个使之成为可能的元素:svg,canvas和div。对于我想做什么,这些元素中的哪一个将提供最佳性能? 为了比较这些方法,我正在考虑创建三个视觉上相同的网页,每个网页中都有页眉,页脚,小部件和文本内容。第一页中的小部件将完全使用元素创建,第二页中的

  • 问题内容: 之间有什么区别,和? 问题答案: 将等待,直到所有资产完成下载为止,例如图像和脚本。 DOM准备就绪,直到您可以通过API访问DOM为止。 作为一个方面说明,在当今这个时代,你应该使用或较旧的IE。