properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
2018-08-17 17:58:51.360 ERROR 18004 --- [ntainer#0-0-C-1] o.s.k.listener.BatchLoggingErrorHandler : Error while processing:
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition ri00-q-log-et-final-0 at offset 36833. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
我已经更改了包含@KafKalistener方法的@Service类,以实现文档中提到的接口
@Service
public class MyAvroListener implements
ConsumerSeekAware.ConsumerSeekCallback,ConsumerSeekAware {
它有@KafKalistener注释的方法,我曾尝试在该方法中搜索分区的ToEnd
@KafkaListener(topics = "${topic}", containerFactory = "myAvroListenerFactory")
public void listen(final Acknowledgment ack, final List<ConsumerRecord<String, EclLogging>> messages) throws Exception {
this.seekCallBack.get().seekToEnd(topic,0);
try {
for (ConsumerRecord<String, EclLogging> kafkaRecord : messages) {
我也尝试过寻找特定的偏移量(因为我一直卡在36833偏移量消息上)
@KafkaListener(topics = "${topic}", containerFactory = "myAvroListenerFactory")
public void listen(final Acknowledgment ack, final List<ConsumerRecord<String, EclLogging>> messages) throws Exception {
this.seekCallBack.get().seek(topic,0,36900);
try {
for (ConsumerRecord<String, EclLogging> kafkaRecord : messages) {
private final ThreadLocal<ConsumerSeekCallback> seekCallBack = new ThreadLocal<>();
@Override
public void registerSeekCallback(ConsumerSeekCallback consumerSeekCallback) {
this.seekCallBack.set(consumerSeekCallback);
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {
}
@Override
public void seek(String s, int i, long l) {
}
@Override
public void seekToBeginning(String s, int i) {
}
@Override
public void seekToEnd(String topic, int partition) {
System.out.println("seekToEnd is hit for topic s = " + topic + " and partition i=" + partition);
}
2018-08-17 17:58:51.360 ERROR 18004 --- [ntainer#0-0-C-1] o.s.k.listener.BatchLoggingErrorHandler : Error while processing:
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition ri00-q-log-et-final-0 at offset 36833. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
我正在使用这里的Spring Kafka模板实现示例的片段,用于搜索偏移量、确认
正如这里提到的,是什么决定了Kafka消费者偏移量?,我不能使用auto.offset.reset属性从主题的末尾开始消费(除非我使用不同的ConsumerGroupID--在我的情况下这是不可能的)。我想知道是否可以利用现有的消费者群体来解决这个问题。
执行查找的时间太晚了--在获取记录的poll()
之后;您需要在
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {
}
通过在其中调用ConsumerSeekCallback.SeekToEnd(...)
。查找将在poll()
提取记录之前发生。
您还可以使用kafka-consumer-groups
命令行工具为组/主题/分区设置任意偏移量。
当前的启动版本是2.0.4,kafka 2.1.8。
此外,您不应该实现传递给您的回调。
文件看起来很清楚...
使用组管理时,在工作分配更改时调用第二个方法。例如,您可以通过调用回调来使用此方法来设置分区的初始偏移量;必须使用回调参数,而不是传入RegisterSeekCallback的参数。
...如果不是,我们应该改变什么?
我已经在kafka上工作了相当长的六个月,我对用户延迟和存储到主题分区中的数据有一些疑问。 问题1:最初,当我开始阅读Kafka并了解如何使用Kafka的功能时,我被教导说,一个只有一部分和一个复制因子的主题会创造奇迹。经过相当长的六个月的工作,将我的项目迁移到live之后,使用我的主题消息的消费者开始给我一个延迟。我阅读了许多关于消费者延迟的堆栈溢出答案,得出结论,如果我增加某个主题的分区和复制
问题内容: 我正在尝试使用JavaScript中的两个字符串进行不区分大小写的搜索。 通常情况如下: 该标志将不区分大小写。 但是我需要搜索第二个字符串。没有标志,它可以完美地工作: 如果我在上面的示例中添加标志,它将搜索searchstring而不是变量“ searchstring”中的内容(下一个示例不起作用): 我该如何实现? 问题答案: 是的,使用而不是。调用的结果将返回匹配自身的实际字符
问题内容: 我在索引文档中有一个字段,需要区分大小写地进行搜索。我正在使用匹配查询来获取结果。我的数据文件的一个示例是: 现在,当我给出以下查询时: 它给了我“ binoy”和“ Binoy”的匹配。我希望搜索区分大小写。缺省情况下,elasticsearch似乎不区分大小写。如何在Elasticsearch中使搜索区分大小写? 问题答案: 在映射中,您可以将字段定义为not_analyzed。
我在索引文档中有一个字段,需要以区分大小写的方式进行搜索。我正在使用匹配查询来获取结果。我的数据文档的一个示例是: 现在当我给出以下查询时: 它给了我一个“比诺伊”对“比诺伊”的匹配。我希望搜索区分大小写。似乎默认情况下,elasticsearch似乎与大小写无关。如何在ElasticSearch中使搜索区分大小写?
本文向大家介绍如何在Oracle中执行不区分大小写的搜索?,包括了如何在Oracle中执行不区分大小写的搜索?的使用技巧和注意事项,需要的朋友参考一下 问题: 您要在Oracle中执行不区分大小写的搜索。 解 处理案例问题的一种方法是使用内置的UPPER和LOWER函数。这些函数使您可以强制单个操作对字符串进行大小写转换 示例 在上面的示例中,将full_name1和full_name2首先转换为
问题内容: 和其他比较运算符等的默认行为区分大小写。 是否可以使它们不区分大小写? 问题答案: 从10gR2开始,Oracle允许通过设置和会话参数来微调字符串比较的行为: 您还可以创建不区分大小写的索引: 该信息来自Oracle不区分大小写的搜索。文章提到了,但似乎也适用于旧版本。 在10gR2之前的版本中,这实际上是做不到的,如果不需要 区分重音符号的 搜索,通常的方法是只对列和搜索表达式都使