当前位置: 首页 > 知识库问答 >
问题:

Apache Kafka的Spring:如何搜索分区的ToEnd?

怀飞掣
2023-03-14
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
    2018-08-17 17:58:51.360 ERROR 18004 --- [ntainer#0-0-C-1] o.s.k.listener.BatchLoggingErrorHandler  : Error while processing:

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition ri00-q-log-et-final-0 at offset 36833. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

我已经更改了包含@KafKalistener方法的@Service类,以实现文档中提到的接口

@Service
 public class MyAvroListener implements 
 ConsumerSeekAware.ConsumerSeekCallback,ConsumerSeekAware {

它有@KafKalistener注释的方法,我曾尝试在该方法中搜索分区的ToEnd

@KafkaListener(topics = "${topic}", containerFactory = "myAvroListenerFactory")
    public void listen(final Acknowledgment ack, final List<ConsumerRecord<String, EclLogging>> messages) throws Exception {
    this.seekCallBack.get().seekToEnd(topic,0);
    try {
        for (ConsumerRecord<String, EclLogging> kafkaRecord : messages) {

我也尝试过寻找特定的偏移量(因为我一直卡在36833偏移量消息上)

@KafkaListener(topics = "${topic}", containerFactory = "myAvroListenerFactory")
        public void listen(final Acknowledgment ack, final List<ConsumerRecord<String, EclLogging>> messages) throws Exception {
        this.seekCallBack.get().seek(topic,0,36900);
        try {
            for (ConsumerRecord<String, EclLogging> kafkaRecord : messages) {
private final ThreadLocal<ConsumerSeekCallback> seekCallBack = new ThreadLocal<>();

@Override
public void registerSeekCallback(ConsumerSeekCallback consumerSeekCallback) {
    this.seekCallBack.set(consumerSeekCallback);
}

@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {

}

@Override
public void onIdleContainer(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {

}

@Override
public void seek(String s, int i, long l) {

}

@Override
public void seekToBeginning(String s, int i) {

}

@Override
public void seekToEnd(String topic, int partition) {
    System.out.println("seekToEnd is hit for topic s = " + topic + " and partition i=" + partition);
}
    2018-08-17 17:58:51.360 ERROR 18004 --- [ntainer#0-0-C-1] o.s.k.listener.BatchLoggingErrorHandler  : Error while processing:

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition ri00-q-log-et-final-0 at offset 36833. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

我正在使用这里的Spring Kafka模板实现示例的片段,用于搜索偏移量、确认

正如这里提到的,是什么决定了Kafka消费者偏移量?,我不能使用auto.offset.reset属性从主题的末尾开始消费(除非我使用不同的ConsumerGroupID--在我的情况下这是不可能的)。我想知道是否可以利用现有的消费者群体来解决这个问题。

共有1个答案

贺季
2023-03-14

执行查找的时间太晚了--在获取记录的poll()之后;您需要在

@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {

}

通过在其中调用ConsumerSeekCallback.SeekToEnd(...)。查找将在poll()提取记录之前发生。

您还可以使用kafka-consumer-groups命令行工具为组/主题/分区设置任意偏移量。

当前的启动版本是2.0.4,kafka 2.1.8。

此外,您不应该实现传递给您的回调。

文件看起来很清楚...

使用组管理时,在工作分配更改时调用第二个方法。例如,您可以通过调用回调来使用此方法来设置分区的初始偏移量;必须使用回调参数,而不是传入RegisterSeekCallback的参数。

...如果不是,我们应该改变什么?

 类似资料:
  • 我已经在kafka上工作了相当长的六个月,我对用户延迟和存储到主题分区中的数据有一些疑问。 问题1:最初,当我开始阅读Kafka并了解如何使用Kafka的功能时,我被教导说,一个只有一部分和一个复制因子的主题会创造奇迹。经过相当长的六个月的工作,将我的项目迁移到live之后,使用我的主题消息的消费者开始给我一个延迟。我阅读了许多关于消费者延迟的堆栈溢出答案,得出结论,如果我增加某个主题的分区和复制

  • 问题内容: 我正在尝试使用JavaScript中的两个字符串进行不区分大小写的搜索。 通常情况如下: 该标志将不区分大小写。 但是我需要搜索第二个字符串。没有标志,它可以完美地工作: 如果我在上面的示例中添加标志,它将搜索searchstring而不是变量“ searchstring”中的内容(下一个示例不起作用): 我该如何实现? 问题答案: 是的,使用而不是。调用的结果将返回匹配自身的实际字符

  • 问题内容: 我在索引文档中有一个字段,需要区分大小写地进行搜索。我正在使用匹配查询来获取结果。我的数据文件的一个示例是: 现在,当我给出以下查询时: 它给了我“ binoy”和“ Binoy”的匹配。我希望搜索区分大小写。缺省情况下,elasticsearch似乎不区分大小写。如何在Elasticsearch中使搜索区分大小写? 问题答案: 在映射中,您可以将字段定义为not_analyzed。

  • 我在索引文档中有一个字段,需要以区分大小写的方式进行搜索。我正在使用匹配查询来获取结果。我的数据文档的一个示例是: 现在当我给出以下查询时: 它给了我一个“比诺伊”对“比诺伊”的匹配。我希望搜索区分大小写。似乎默认情况下,elasticsearch似乎与大小写无关。如何在ElasticSearch中使搜索区分大小写?

  • 本文向大家介绍如何在Oracle中执行不区分大小写的搜索?,包括了如何在Oracle中执行不区分大小写的搜索?的使用技巧和注意事项,需要的朋友参考一下 问题: 您要在Oracle中执行不区分大小写的搜索。 解 处理案例问题的一种方法是使用内置的UPPER和LOWER函数。这些函数使您可以强制单个操作对字符串进行大小写转换 示例 在上面的示例中,将full_name1和full_name2首先转换为

  • 问题内容: 和其他比较运算符等的默认行为区分大小写。 是否可以使它们不区分大小写? 问题答案: 从10gR2开始,Oracle允许通过设置和会话参数来微调字符串比较的行为: 您还可以创建不区分大小写的索引: 该信息来自Oracle不区分大小写的搜索。文章提到了,但似乎也适用于旧版本。 在10gR2之前的版本中,这实际上是做不到的,如果不需要 区分重音符号的 搜索,通常的方法是只对列和搜索表达式都使