我阅读了Kafka的所有文档,我读到的唯一方法是git和指定
public void configure() throws Exception {
from("kafka:" + TOPIC
+ "?groupId=A"
+ "&autoOffsetReset=earliest" // Ask to start from the beginning if we have unknown offset
+ "&consumersCount=2" // We have 2 partitions, we want 1 consumer per partition
+ "&offsetRepository=#offset") // Keep the offset in our repository
.to("mock:result");
}
但是为了客户的订单,我需要使用Spring,所以我的Kafkaendpoint是这样的
<!--DEFINE KAFKA'S TOPCIS AS ENDPOINT-->
<endpoint id="tagBlink" uri="kafka:10.0.0.165:9092">
<property key="topic" value="tagBlink"/>
<property key="brokers" value="10.0.0.165:9092"/>
<property key="offsetRepository" value="100"/>
</endpoint>
但是得到一个例外
无法为属性找到合适的setter:offsetRepository,因为没有具有相同类型的setter方法:java.lang.String也不可能进行类型转换:没有类型转换器可用于从类型转换:java.lang.String到所需类型:<--plhd-2/>StateRepository with value 100
这可能与我的当前配置吗?我如何从特定的偏移恢复??
重要的词是“repository”而不是“offset”:它不是一个整数值,而是对一个bean的引用,该bean指定了偏移量的持久化位置。
(非Spring)示例
// Create the repository in which the Kafka offsets will be persisted
FileStateRepository repository = FileStateRepository.fileStateRepository(new File("/path/to/repo.dat"));
// Bind this repository into the Camel registry
JndiRegistry registry = new JndiRegistry();
registry.bind("offsetRepo", repository);
// Configure the camel context
DefaultCamelContext camelContext = new DefaultCamelContext(registry);
camelContext.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("kafka:" + TOPIC + "?brokers=localhost:{{kafkaPort}}" +
"&groupId=A" + //
"&autoOffsetReset=earliest" + // Ask to start from the beginning if we have unknown offset
"&offsetRepository=#offsetRepo") // Keep the offsets in the previously configured repository
.to("mock:result");
}
});
在这段时间之后,我设法解决了这个问题。为此,我遵循了Springbean的创建过程,并检查了FileStateRepository
的文档。我需要一个文件,所以我创建了一个文件Bean,并添加为构造函数arg。之后,我添加了一个init method=“doStart”
。这个方法加载一个文件(如果存在),如果不存在,它将创建该文件。
<endpoint id="event" uri="kafka:localhost:9092">
<property key="topic" value="eventTopic4"/>
<property key="brokers" value="localhost:9092"/>
<property key="autoOffsetReset" value="earliest"/>
<property key="offsetRepository" value="#myRepo2"/>
</endpoint>
<bean id="myFileOfMyRepo" class="java.io.File">
<constructor-arg type="java.lang.String" value="C:\repoDat\repo.dat"/>
</bean>
<bean id="myRepo2" class="org.apache.camel.impl.FileStateRepository " factory-method="fileStateRepository" init-method="doStart">
<constructor-arg ref="myFileOfMyRepo"/>
</bean>
在这之后,我在Git中看到了Kafka骆驼消费者的代码。
offsetRepository.getState(serializeOffsetKey(topicPartition));
if (offsetState != null && !offsetState.isEmpty()) {
// The state contains the last read offset so you need to seek from the next one
long offset = deserializeOffsetValue(offsetState) + 1;
log.debug("Resuming partition {} from offset {} from state", topicPartition.partition(), offset);
consumer.seek(topicPartition, offset);
}
有了这个,我设法读到了最后一个偏移量。我希望为Kafka增加这些额外的步骤。
我的spring boot项目有一个演示Kafka Streams API的应用程序。我可以使用以下命令使用主题中的所有消息 Kafka Streams API中使用KStream或ktable使用消息的类似命令是什么?我试过了 两者都不起作用。我确实创建了一个测试用例,用而不是流来使用,但它不起作用。代码上传到Github以供参考。任何帮助都会很好。
我想打印Flink已开始读取的Kafka主题的每个分区的起始偏移量?
我想从话题的一开始就开始消费。我已经将属性“AUTO\u OFFSET\u RESET\u CONFIG”设置为最早,但不知何故它仍然没有从一开始就读取。 如果我错过了什么,有什么想法吗?我每次都在创造一个新的消费群体。
有没有其他方法可以做到这一点?
我是 kafka 的新手,并试图了解是否有办法从上次使用的偏移量读取消息,但不是从头开始。 我正在写一个例子,这样我的意图就不会偏离。 有没有一种方法可以获取从上次使用的偏移量生成的消息。?
问题内容: 嘿,我正在尝试打开文件,仅从偏移量读取一定长度!我阅读了以下主题: 如何使用Java中的文件中的特定行号读取特定行? 在那儿,它说在不读取之前就不可能读取某行,但是我想知道字节! 是否可以从已知偏移量读取某些字节? 问题答案: RandomAccessFile提供一个功能: