当前位置: 首页 > 知识库问答 >
问题:

从Apache Camel中的特定偏移量开始阅读Kafka主题

曹振
2023-03-14

我阅读了Kafka的所有文档,我读到的唯一方法是git和指定

    public void configure() throws Exception {
                    from("kafka:" + TOPIC
                                 + "?groupId=A"
                                 + "&autoOffsetReset=earliest"             // Ask to start from the beginning if we have unknown offset
                                 + "&consumersCount=2"                     // We have 2 partitions, we want 1 consumer per partition
                                 + "&offsetRepository=#offset")            // Keep the offset in our repository
                            .to("mock:result");

}

但是为了客户的订单,我需要使用Spring,所以我的Kafkaendpoint是这样的

<!--DEFINE KAFKA'S TOPCIS AS ENDPOINT-->
        <endpoint id="tagBlink" uri="kafka:10.0.0.165:9092">
            <property key="topic" value="tagBlink"/>
            <property key="brokers" value="10.0.0.165:9092"/>
            <property key="offsetRepository" value="100"/>
        </endpoint>

但是得到一个例外

无法为属性找到合适的setter:offsetRepository,因为没有具有相同类型的setter方法:java.lang.String也不可能进行类型转换:没有类型转换器可用于从类型转换:java.lang.String到所需类型:<--plhd-2/>StateRepository with value 100

这可能与我的当前配置吗?我如何从特定的偏移恢复??

共有2个答案

仉宪
2023-03-14

重要的词是“repository”而不是“offset”:它不是一个整数值,而是对一个bean的引用,该bean指定了偏移量的持久化位置。

(非Spring)示例

// Create the repository in which the Kafka offsets will be persisted
FileStateRepository repository = FileStateRepository.fileStateRepository(new File("/path/to/repo.dat"));

// Bind this repository into the Camel registry
JndiRegistry registry = new JndiRegistry();
registry.bind("offsetRepo", repository);

// Configure the camel context
DefaultCamelContext camelContext = new DefaultCamelContext(registry);
camelContext.addRoutes(new RouteBuilder() {
    @Override
    public void configure() throws Exception {
        from("kafka:" + TOPIC + "?brokers=localhost:{{kafkaPort}}" +
                     "&groupId=A" +                            //
                     "&autoOffsetReset=earliest" +             // Ask to start from the beginning if we have unknown offset
                     "&offsetRepository=#offsetRepo")          // Keep the offsets in the previously configured repository
                .to("mock:result");
    }
});
方焱
2023-03-14

在这段时间之后,我设法解决了这个问题。为此,我遵循了Springbean的创建过程,并检查了FileStateRepository的文档。我需要一个文件,所以我创建了一个文件Bean,并添加为构造函数arg。之后,我添加了一个init method=“doStart”。这个方法加载一个文件(如果存在),如果不存在,它将创建该文件。

     <endpoint id="event" uri="kafka:localhost:9092">
        <property key="topic" value="eventTopic4"/>
        <property key="brokers" value="localhost:9092"/>
        <property key="autoOffsetReset" value="earliest"/>
        <property key="offsetRepository" value="#myRepo2"/>
    </endpoint>

    <bean id="myFileOfMyRepo" class="java.io.File">
        <constructor-arg type="java.lang.String" value="C:\repoDat\repo.dat"/>
    </bean>

    <bean id="myRepo2" class="org.apache.camel.impl.FileStateRepository " factory-method="fileStateRepository" init-method="doStart">
        <constructor-arg ref="myFileOfMyRepo"/>
    </bean>

在这之后,我在Git中看到了Kafka骆驼消费者的代码。

    offsetRepository.getState(serializeOffsetKey(topicPartition));
    if (offsetState != null && !offsetState.isEmpty()) {
        // The state contains the last read offset so you need to seek from the next one
        long offset = deserializeOffsetValue(offsetState) + 1;
        log.debug("Resuming partition {} from offset {} from state", topicPartition.partition(), offset);
        consumer.seek(topicPartition, offset);
    } 

有了这个,我设法读到了最后一个偏移量。我希望为Kafka增加这些额外的步骤。

 类似资料:
  • 我的spring boot项目有一个演示Kafka Streams API的应用程序。我可以使用以下命令使用主题中的所有消息 Kafka Streams API中使用KStream或ktable使用消息的类似命令是什么?我试过了 两者都不起作用。我确实创建了一个测试用例,用而不是流来使用,但它不起作用。代码上传到Github以供参考。任何帮助都会很好。

  • 我想打印Flink已开始读取的Kafka主题的每个分区的起始偏移量?

  • 我想从话题的一开始就开始消费。我已经将属性“AUTO\u OFFSET\u RESET\u CONFIG”设置为最早,但不知何故它仍然没有从一开始就读取。 如果我错过了什么,有什么想法吗?我每次都在创造一个新的消费群体。

  • 有没有其他方法可以做到这一点?

  • 我是 kafka 的新手,并试图了解是否有办法从上次使用的偏移量读取消息,但不是从头开始。 我正在写一个例子,这样我的意图就不会偏离。 有没有一种方法可以获取从上次使用的偏移量生成的消息。?

  • 问题内容: 嘿,我正在尝试打开文件,仅从偏移量读取一定长度!我阅读了以下主题: 如何使用Java中的文件中的特定行号读取特定行? 在那儿,它说在不读取之前就不可能读取某行,但是我想知道字节! 是否可以从已知偏移量读取某些字节? 问题答案: RandomAccessFile提供一个功能: